1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim:set ts=2 sw=2 sts=2 et cindent: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/dom/ReadableStreamDefaultController.h"
9 #include "js/Exception.h"
10 #include "js/TypeDecls.h"
12 #include "mozilla/AlreadyAddRefed.h"
13 #include "mozilla/Attributes.h"
14 #include "mozilla/HoldDropJSObjects.h"
15 #include "mozilla/dom/Promise.h"
16 #include "mozilla/dom/Promise-inl.h"
17 #include "mozilla/dom/ReadableStream.h"
18 #include "mozilla/dom/ReadableStreamController.h"
19 #include "mozilla/dom/ReadableStreamDefaultControllerBinding.h"
20 #include "mozilla/dom/ReadableStreamDefaultReaderBinding.h"
21 #include "mozilla/dom/UnderlyingSourceBinding.h"
22 #include "mozilla/dom/UnderlyingSourceCallbackHelpers.h"
23 #include "nsCycleCollectionParticipant.h"
24 #include "nsISupports.h"
26 namespace mozilla::dom
{
28 using namespace streams_abstract
;
30 NS_IMPL_CYCLE_COLLECTION(ReadableStreamController
, mGlobal
, mAlgorithms
,
32 NS_IMPL_CYCLE_COLLECTING_ADDREF(ReadableStreamController
)
33 NS_IMPL_CYCLE_COLLECTING_RELEASE(ReadableStreamController
)
35 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ReadableStreamController
)
36 NS_INTERFACE_MAP_ENTRY(nsISupports
)
39 ReadableStreamController::ReadableStreamController(nsIGlobalObject
* aGlobal
)
42 void ReadableStreamController::SetStream(ReadableStream
* aStream
) {
46 // Note: Using the individual macros vs NS_IMPL_CYCLE_COLLECTION_WRAPPERCACHE
47 // because I need to specify a manual implementation of
48 // NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN.
49 NS_IMPL_CYCLE_COLLECTION_CLASS(ReadableStreamDefaultController
)
51 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN_INHERITED(ReadableStreamDefaultController
,
52 ReadableStreamController
)
53 NS_IMPL_CYCLE_COLLECTION_UNLINK(mStrategySizeAlgorithm
)
55 NS_IMPL_CYCLE_COLLECTION_UNLINK_PRESERVED_WRAPPER
56 NS_IMPL_CYCLE_COLLECTION_UNLINK_END
58 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN_INHERITED(
59 ReadableStreamDefaultController
, ReadableStreamController
)
60 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mStrategySizeAlgorithm
)
61 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
63 NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN_INHERITED(ReadableStreamDefaultController
,
64 ReadableStreamController
)
65 NS_IMPL_CYCLE_COLLECTION_TRACE_PRESERVED_WRAPPER
66 // Trace the associated queue.
67 for (const auto& queueEntry
: tmp
->mQueue
) {
68 aCallbacks
.Trace(&queueEntry
->mValue
, "mQueue.mValue", aClosure
);
70 NS_IMPL_CYCLE_COLLECTION_TRACE_END
72 NS_IMPL_ADDREF_INHERITED(ReadableStreamDefaultController
,
73 ReadableStreamController
)
74 NS_IMPL_RELEASE_INHERITED(ReadableStreamDefaultController
,
75 ReadableStreamController
)
77 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ReadableStreamDefaultController
)
78 NS_WRAPPERCACHE_INTERFACE_MAP_ENTRY
79 NS_INTERFACE_MAP_END_INHERITING(ReadableStreamController
)
81 ReadableStreamDefaultController::ReadableStreamDefaultController(
82 nsIGlobalObject
* aGlobal
)
83 : ReadableStreamController(aGlobal
) {
84 mozilla::HoldJSObjects(this);
87 ReadableStreamDefaultController::~ReadableStreamDefaultController() {
88 // MG:XXX: LinkedLists are required to be empty at destruction, but it seems
89 // it is possible to have a controller be destructed while still
90 // having entries in its queue.
92 // This needs to be verified as not indicating some other issue.
93 mozilla::DropJSObjects(this);
97 JSObject
* ReadableStreamDefaultController::WrapObject(
98 JSContext
* aCx
, JS::Handle
<JSObject
*> aGivenProto
) {
99 return ReadableStreamDefaultController_Binding::Wrap(aCx
, this, aGivenProto
);
102 namespace streams_abstract
{
104 // https://streams.spec.whatwg.org/#readable-stream-default-controller-can-close-or-enqueue
105 static bool ReadableStreamDefaultControllerCanCloseOrEnqueue(
106 ReadableStreamDefaultController
* aController
) {
107 // Step 1. Let state be controller.[[stream]].[[state]].
108 ReadableStream::ReaderState state
= aController
->Stream()->State();
110 // Step 2. If controller.[[closeRequested]] is false and state is "readable",
112 // Step 3. Return false.
113 return !aController
->CloseRequested() &&
114 state
== ReadableStream::ReaderState::Readable
;
117 // https://streams.spec.whatwg.org/#readable-stream-default-controller-can-close-or-enqueue
118 // This is a variant of ReadableStreamDefaultControllerCanCloseOrEnqueue
119 // that also throws when the function would return false to improve error
121 bool ReadableStreamDefaultControllerCanCloseOrEnqueueAndThrow(
122 ReadableStreamDefaultController
* aController
,
123 CloseOrEnqueue aCloseOrEnqueue
, ErrorResult
& aRv
) {
124 // Step 1. Let state be controller.[[stream]].[[state]].
125 ReadableStream::ReaderState state
= aController
->Stream()->State();
128 if (aCloseOrEnqueue
== CloseOrEnqueue::Close
) {
129 prefix
= "Cannot close a stream that "_ns
;
131 prefix
= "Cannot enqueue into a stream that "_ns
;
135 case ReadableStream::ReaderState::Readable
:
136 // Step 2. If controller.[[closeRequested]] is false and
137 // state is "readable", return true.
138 // Note: We don't error/check for [[closeRequest]] first, because
139 // [[closedRequest]] is still true even after the state is "closed".
140 // This doesn't cause any spec observable difference.
141 if (!aController
->CloseRequested()) {
145 // Step 3. Return false.
146 aRv
.ThrowTypeError(prefix
+ "has already been requested to close."_ns
);
149 case ReadableStream::ReaderState::Closed
:
150 aRv
.ThrowTypeError(prefix
+ "is already closed."_ns
);
153 case ReadableStream::ReaderState::Errored
:
154 aRv
.ThrowTypeError(prefix
+ "has errored."_ns
);
158 MOZ_ASSERT_UNREACHABLE("Unknown ReaderState");
163 Nullable
<double> ReadableStreamDefaultControllerGetDesiredSize(
164 ReadableStreamDefaultController
* aController
) {
165 ReadableStream::ReaderState state
= aController
->Stream()->State();
166 if (state
== ReadableStream::ReaderState::Errored
) {
170 if (state
== ReadableStream::ReaderState::Closed
) {
174 return aController
->StrategyHWM() - aController
->QueueTotalSize();
177 } // namespace streams_abstract
179 // https://streams.spec.whatwg.org/#rs-default-controller-desired-size
180 Nullable
<double> ReadableStreamDefaultController::GetDesiredSize() {
182 return ReadableStreamDefaultControllerGetDesiredSize(this);
185 namespace streams_abstract
{
187 // https://streams.spec.whatwg.org/#readable-stream-default-controller-clear-algorithms
189 // Note: nullptr is used to indicate we run the default algorithm at the
191 // so the below doesn't quite match the spec, but serves the correct
192 // purpose for disconnecting the algorithms from the object graph to allow
195 // As far as I know, this isn't currently visible, but we need to keep
196 // this in mind. This is a weakness of this current implementation, and
197 // I'd prefer to have a better answer here eventually.
198 void ReadableStreamDefaultControllerClearAlgorithms(
199 ReadableStreamDefaultController
* aController
) {
202 aController
->ClearAlgorithms();
205 aController
->setStrategySizeAlgorithm(nullptr);
208 // https://streams.spec.whatwg.org/#readable-stream-default-controller-close
209 void ReadableStreamDefaultControllerClose(
210 JSContext
* aCx
, ReadableStreamDefaultController
* aController
,
213 if (!ReadableStreamDefaultControllerCanCloseOrEnqueue(aController
)) {
218 RefPtr
<ReadableStream
> stream
= aController
->Stream();
221 aController
->SetCloseRequested(true);
224 if (aController
->Queue().isEmpty()) {
226 ReadableStreamDefaultControllerClearAlgorithms(aController
);
229 ReadableStreamClose(aCx
, stream
, aRv
);
233 } // namespace streams_abstract
235 // https://streams.spec.whatwg.org/#rs-default-controller-close
236 void ReadableStreamDefaultController::Close(JSContext
* aCx
, ErrorResult
& aRv
) {
238 if (!ReadableStreamDefaultControllerCanCloseOrEnqueueAndThrow(
239 this, CloseOrEnqueue::Close
, aRv
)) {
244 ReadableStreamDefaultControllerClose(aCx
, this, aRv
);
247 namespace streams_abstract
{
249 MOZ_CAN_RUN_SCRIPT
static void ReadableStreamDefaultControllerCallPullIfNeeded(
250 JSContext
* aCx
, ReadableStreamDefaultController
* aController
,
253 // https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue
254 void ReadableStreamDefaultControllerEnqueue(
255 JSContext
* aCx
, ReadableStreamDefaultController
* aController
,
256 JS::Handle
<JS::Value
> aChunk
, ErrorResult
& aRv
) {
258 if (!ReadableStreamDefaultControllerCanCloseOrEnqueue(aController
)) {
263 RefPtr
<ReadableStream
> stream
= aController
->Stream();
266 if (IsReadableStreamLocked(stream
) &&
267 ReadableStreamGetNumReadRequests(stream
) > 0) {
268 ReadableStreamFulfillReadRequest(aCx
, stream
, aChunk
, false, aRv
);
271 Optional
<JS::Handle
<JS::Value
>> optionalChunk(aCx
, aChunk
);
273 // Step 4.3 (Re-ordered);
274 RefPtr
<QueuingStrategySize
> sizeAlgorithm(
275 aController
->StrategySizeAlgorithm());
277 // If !sizeAlgorithm, we return 1, which is inlined from
278 // https://streams.spec.whatwg.org/#make-size-algorithm-from-size-function
281 ? sizeAlgorithm
->Call(
283 "ReadableStreamDefaultController.[[strategySizeAlgorithm]]",
284 CallbackObject::eRethrowExceptions
)
287 // If this is an uncatchable exception we can't continue.
288 if (aRv
.IsUncatchableException()) {
293 if (aRv
.MaybeSetPendingException(
294 aCx
, "ReadableStreamDefaultController.enqueue")) {
295 JS::Rooted
<JS::Value
> errorValue(aCx
);
297 JS_GetPendingException(aCx
, &errorValue
);
301 ReadableStreamDefaultControllerError(aCx
, aController
, errorValue
, aRv
);
306 // Step 4.2.2 Caller must treat aRv as if it were a completion
308 aRv
.MightThrowJSException();
309 aRv
.ThrowJSException(aCx
, errorValue
);
314 EnqueueValueWithSize(aController
, aChunk
, chunkSize
, aRv
);
317 // Note we convert the pending exception to a JS value here, and then
318 // re-throw it because we save this exception and re-expose it elsewhere
319 // and there are tests to ensure the identity of these errors are the same.
320 if (aRv
.MaybeSetPendingException(
321 aCx
, "ReadableStreamDefaultController.enqueue")) {
322 JS::Rooted
<JS::Value
> errorValue(aCx
);
324 if (!JS_GetPendingException(aCx
, &errorValue
)) {
325 // Uncatchable exception; we should mark aRv and return.
326 aRv
.StealExceptionFromJSContext(aCx
);
329 JS_ClearPendingException(aCx
);
332 ReadableStreamDefaultControllerError(aCx
, aController
, errorValue
, aRv
);
337 // Step 4.5.2 Caller must treat aRv as if it were a completion
339 aRv
.MightThrowJSException();
340 aRv
.ThrowJSException(aCx
, errorValue
);
346 ReadableStreamDefaultControllerCallPullIfNeeded(aCx
, aController
, aRv
);
349 } // namespace streams_abstract
351 // https://streams.spec.whatwg.org/#rs-default-controller-close
352 void ReadableStreamDefaultController::Enqueue(JSContext
* aCx
,
353 JS::Handle
<JS::Value
> aChunk
,
356 if (!ReadableStreamDefaultControllerCanCloseOrEnqueueAndThrow(
357 this, CloseOrEnqueue::Enqueue
, aRv
)) {
362 ReadableStreamDefaultControllerEnqueue(aCx
, this, aChunk
, aRv
);
365 void ReadableStreamDefaultController::Error(JSContext
* aCx
,
366 JS::Handle
<JS::Value
> aError
,
368 ReadableStreamDefaultControllerError(aCx
, this, aError
, aRv
);
371 namespace streams_abstract
{
373 // https://streams.spec.whatwg.org/#readable-stream-default-controller-should-call-pull
374 bool ReadableStreamDefaultControllerShouldCallPull(
375 ReadableStreamDefaultController
* aController
) {
377 ReadableStream
* stream
= aController
->Stream();
380 if (!ReadableStreamDefaultControllerCanCloseOrEnqueue(aController
)) {
385 if (!aController
->Started()) {
390 if (IsReadableStreamLocked(stream
) &&
391 ReadableStreamGetNumReadRequests(stream
) > 0) {
396 Nullable
<double> desiredSize
=
397 ReadableStreamDefaultControllerGetDesiredSize(aController
);
400 MOZ_ASSERT(!desiredSize
.IsNull());
403 return desiredSize
.Value() > 0;
406 // https://streams.spec.whatwg.org/#readable-stream-default-controller-error
407 void ReadableStreamDefaultControllerError(
408 JSContext
* aCx
, ReadableStreamDefaultController
* aController
,
409 JS::Handle
<JS::Value
> aValue
, ErrorResult
& aRv
) {
411 ReadableStream
* stream
= aController
->Stream();
414 if (stream
->State() != ReadableStream::ReaderState::Readable
) {
419 ResetQueue(aController
);
422 ReadableStreamDefaultControllerClearAlgorithms(aController
);
425 ReadableStreamError(aCx
, stream
, aValue
, aRv
);
428 // https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed
429 static void ReadableStreamDefaultControllerCallPullIfNeeded(
430 JSContext
* aCx
, ReadableStreamDefaultController
* aController
,
433 bool shouldPull
= ReadableStreamDefaultControllerShouldCallPull(aController
);
441 if (aController
->Pulling()) {
443 aController
->SetPullAgain(true);
449 MOZ_ASSERT(!aController
->PullAgain());
452 aController
->SetPulling(true);
455 RefPtr
<UnderlyingSourceAlgorithmsBase
> algorithms
=
456 aController
->GetAlgorithms();
457 RefPtr
<Promise
> pullPromise
=
458 algorithms
->PullCallback(aCx
, *aController
, aRv
);
464 pullPromise
->AddCallbacksWithCycleCollectedArgs(
465 [](JSContext
* aCx
, JS::Handle
<JS::Value
> aValue
, ErrorResult
& aRv
,
466 ReadableStreamDefaultController
* mController
)
467 MOZ_CAN_RUN_SCRIPT_BOUNDARY
{
469 mController
->SetPulling(false);
471 if (mController
->PullAgain()) {
473 mController
->SetPullAgain(false);
477 ReadableStreamDefaultControllerCallPullIfNeeded(
478 aCx
, MOZ_KnownLive(mController
), aRv
);
481 [](JSContext
* aCx
, JS::Handle
<JS::Value
> aValue
, ErrorResult
& aRv
,
482 ReadableStreamDefaultController
* mController
) {
484 ReadableStreamDefaultControllerError(aCx
, mController
, aValue
, aRv
);
486 RefPtr(aController
));
489 // https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller
490 void SetUpReadableStreamDefaultController(
491 JSContext
* aCx
, ReadableStream
* aStream
,
492 ReadableStreamDefaultController
* aController
,
493 UnderlyingSourceAlgorithmsBase
* aAlgorithms
, double aHighWaterMark
,
494 QueuingStrategySize
* aSizeAlgorithm
, ErrorResult
& aRv
) {
496 MOZ_ASSERT(!aStream
->Controller());
499 aController
->SetStream(aStream
);
502 ResetQueue(aController
);
505 aController
->SetStarted(false);
506 aController
->SetCloseRequested(false);
507 aController
->SetPullAgain(false);
508 aController
->SetPulling(false);
511 aController
->setStrategySizeAlgorithm(aSizeAlgorithm
);
512 aController
->SetStrategyHWM(aHighWaterMark
);
516 aController
->SetAlgorithms(*aAlgorithms
);
519 aStream
->SetController(*aController
);
521 // Step 9. Default algorithm returns undefined. See Step 2 of
522 // https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller
523 JS::Rooted
<JS::Value
> startResult(aCx
, JS::UndefinedValue());
524 RefPtr
<ReadableStreamDefaultController
> controller
= aController
;
525 aAlgorithms
->StartCallback(aCx
, *controller
, &startResult
, aRv
);
531 RefPtr
<Promise
> startPromise
=
532 Promise::CreateInfallible(aStream
->GetParentObject());
533 startPromise
->MaybeResolve(startResult
);
536 startPromise
->AddCallbacksWithCycleCollectedArgs(
537 [](JSContext
* aCx
, JS::Handle
<JS::Value
> aValue
, ErrorResult
& aRv
,
538 ReadableStreamDefaultController
* aController
)
539 MOZ_CAN_RUN_SCRIPT_BOUNDARY
{
540 MOZ_ASSERT(aController
);
543 aController
->SetStarted(true);
546 aController
->SetPulling(false);
549 aController
->SetPullAgain(false);
552 ReadableStreamDefaultControllerCallPullIfNeeded(
553 aCx
, MOZ_KnownLive(aController
), aRv
);
556 [](JSContext
* aCx
, JS::Handle
<JS::Value
> aValue
, ErrorResult
& aRv
,
557 ReadableStreamDefaultController
* aController
) {
559 ReadableStreamDefaultControllerError(aCx
, aController
, aValue
, aRv
);
561 RefPtr(aController
));
564 // https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller-from-underlying-source
565 void SetupReadableStreamDefaultControllerFromUnderlyingSource(
566 JSContext
* aCx
, ReadableStream
* aStream
,
567 JS::Handle
<JSObject
*> aUnderlyingSource
,
568 UnderlyingSource
& aUnderlyingSourceDict
, double aHighWaterMark
,
569 QueuingStrategySize
* aSizeAlgorithm
, ErrorResult
& aRv
) {
571 RefPtr
<ReadableStreamDefaultController
> controller
=
572 new ReadableStreamDefaultController(aStream
->GetParentObject());
575 RefPtr
<UnderlyingSourceAlgorithms
> algorithms
=
576 new UnderlyingSourceAlgorithms(aStream
->GetParentObject(),
577 aUnderlyingSource
, aUnderlyingSourceDict
);
580 SetUpReadableStreamDefaultController(aCx
, aStream
, controller
, algorithms
,
581 aHighWaterMark
, aSizeAlgorithm
, aRv
);
584 } // namespace streams_abstract
586 // https://streams.spec.whatwg.org/#rs-default-controller-private-cancel
587 already_AddRefed
<Promise
> ReadableStreamDefaultController::CancelSteps(
588 JSContext
* aCx
, JS::Handle
<JS::Value
> aReason
, ErrorResult
& aRv
) {
593 Optional
<JS::Handle
<JS::Value
>> errorOption(aCx
, aReason
);
594 RefPtr
<UnderlyingSourceAlgorithmsBase
> algorithms
= mAlgorithms
;
595 RefPtr
<Promise
> result
= algorithms
->CancelCallback(aCx
, errorOption
, aRv
);
601 ReadableStreamDefaultControllerClearAlgorithms(this);
604 return result
.forget();
607 // https://streams.spec.whatwg.org/#rs-default-controller-private-pull
608 void ReadableStreamDefaultController::PullSteps(JSContext
* aCx
,
609 ReadRequest
* aReadRequest
,
612 RefPtr
<ReadableStream
> stream
= mStream
;
615 if (!mQueue
.isEmpty()) {
617 JS::Rooted
<JS::Value
> chunk(aCx
);
618 DequeueValue(this, &chunk
);
621 if (CloseRequested() && mQueue
.isEmpty()) {
623 ReadableStreamDefaultControllerClearAlgorithms(this);
625 ReadableStreamClose(aCx
, stream
, aRv
);
631 ReadableStreamDefaultControllerCallPullIfNeeded(aCx
, this, aRv
);
638 aReadRequest
->ChunkSteps(aCx
, chunk
, aRv
);
642 ReadableStreamAddReadRequest(stream
, aReadRequest
);
644 ReadableStreamDefaultControllerCallPullIfNeeded(aCx
, this, aRv
);
648 // https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultcontroller-releasesteps
649 void ReadableStreamDefaultController::ReleaseSteps() {
653 } // namespace mozilla::dom