1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim:set ts=2 sw=2 sts=2 et cindent: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/dom/ReadableStream.h"
9 #include "ReadIntoRequest.h"
10 #include "ReadableStreamPipeTo.h"
11 #include "ReadableStreamTee.h"
12 #include "StreamUtils.h"
15 #include "js/Exception.h"
16 #include "js/PropertyAndElement.h"
17 #include "js/TypeDecls.h"
19 #include "mozilla/AlreadyAddRefed.h"
20 #include "mozilla/Assertions.h"
21 #include "mozilla/Attributes.h"
22 #include "mozilla/CycleCollectedJSContext.h"
23 #include "mozilla/FloatingPoint.h"
24 #include "mozilla/HoldDropJSObjects.h"
25 #include "mozilla/StaticPrefs_dom.h"
26 #include "mozilla/dom/BindingCallContext.h"
27 #include "mozilla/dom/ByteStreamHelpers.h"
28 #include "mozilla/dom/BodyStream.h"
29 #include "mozilla/dom/QueueWithSizes.h"
30 #include "mozilla/dom/QueuingStrategyBinding.h"
31 #include "mozilla/dom/ReadRequest.h"
32 #include "mozilla/dom/ReadableByteStreamController.h"
33 #include "mozilla/dom/ReadableStreamBYOBReader.h"
34 #include "mozilla/dom/ReadableStreamBinding.h"
35 #include "mozilla/dom/ReadableStreamController.h"
36 #include "mozilla/dom/ReadableStreamDefaultController.h"
37 #include "mozilla/dom/ReadableStreamDefaultReader.h"
38 #include "mozilla/dom/RootedDictionary.h"
39 #include "mozilla/dom/ScriptSettings.h"
40 #include "mozilla/dom/UnderlyingSourceBinding.h"
41 #include "mozilla/dom/UnderlyingSourceCallbackHelpers.h"
42 #include "mozilla/dom/WritableStream.h"
43 #include "mozilla/dom/WritableStreamDefaultWriter.h"
46 #include "mozilla/dom/Promise-inl.h"
47 #include "nsIGlobalObject.h"
48 #include "nsISupports.h"
50 inline void ImplCycleCollectionTraverse(
51 nsCycleCollectionTraversalCallback
& aCallback
,
52 mozilla::Variant
<mozilla::Nothing
,
53 RefPtr
<mozilla::dom::ReadableStreamDefaultReader
>>&
55 const char* aName
, uint32_t aFlags
= 0) {
56 if (aReader
.is
<RefPtr
<mozilla::dom::ReadableStreamDefaultReader
>>()) {
57 ImplCycleCollectionTraverse(
59 aReader
.as
<RefPtr
<mozilla::dom::ReadableStreamDefaultReader
>>(), aName
,
64 inline void ImplCycleCollectionUnlink(
65 mozilla::Variant
<mozilla::Nothing
,
66 RefPtr
<mozilla::dom::ReadableStreamDefaultReader
>>&
68 aReader
= AsVariant(mozilla::Nothing());
71 namespace mozilla::dom
{
73 // Only needed for refcounted objects.
74 NS_IMPL_CYCLE_COLLECTION_WRAPPERCACHE_WITH_JS_MEMBERS(
75 ReadableStream
, (mGlobal
, mController
, mReader
), (mStoredError
))
77 NS_IMPL_CYCLE_COLLECTING_ADDREF(ReadableStream
)
78 NS_IMPL_CYCLE_COLLECTING_RELEASE(ReadableStream
)
79 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ReadableStream
)
80 NS_WRAPPERCACHE_INTERFACE_MAP_ENTRY
81 NS_INTERFACE_MAP_ENTRY(nsISupports
)
84 ReadableStream::ReadableStream(nsIGlobalObject
* aGlobal
)
85 : mGlobal(aGlobal
), mReader(nullptr) {
86 mozilla::HoldJSObjects(this);
89 ReadableStream::ReadableStream(const GlobalObject
& aGlobal
)
90 : mGlobal(do_QueryInterface(aGlobal
.GetAsSupports())), mReader(nullptr) {
91 mozilla::HoldJSObjects(this);
94 ReadableStream::~ReadableStream() { mozilla::DropJSObjects(this); }
96 JSObject
* ReadableStream::WrapObject(JSContext
* aCx
,
97 JS::Handle
<JSObject
*> aGivenProto
) {
98 return ReadableStream_Binding::Wrap(aCx
, this, aGivenProto
);
101 ReadableStreamDefaultReader
* ReadableStream::GetDefaultReader() {
102 return mReader
->AsDefault();
105 void ReadableStream::SetReader(ReadableStreamGenericReader
* aReader
) {
109 // https://streams.spec.whatwg.org/#readable-stream-has-byob-reader
110 bool ReadableStreamHasBYOBReader(ReadableStream
* aStream
) {
111 // Step 1. Let reader be stream.[[reader]].
112 ReadableStreamGenericReader
* reader
= aStream
->GetReader();
114 // Step 2. If reader is undefined, return false.
119 // Step 3. If reader implements ReadableStreamBYOBReader, return true.
120 // Step 4. Return false.
121 return reader
->IsBYOB();
124 // https://streams.spec.whatwg.org/#readable-stream-has-default-reader
125 bool ReadableStreamHasDefaultReader(ReadableStream
* aStream
) {
126 // Step 1. Let reader be stream.[[reader]].
127 ReadableStreamGenericReader
* reader
= aStream
->GetReader();
129 // Step 2. If reader is undefined, return false.
134 // Step 3. If reader implements ReadableStreamDefaultReader, return true.
135 // Step 4. Return false.
136 return reader
->IsDefault();
139 // Streams Spec: 4.2.4: https://streams.spec.whatwg.org/#rs-prototype
141 already_AddRefed
<ReadableStream
> ReadableStream::Constructor(
142 const GlobalObject
& aGlobal
,
143 const Optional
<JS::Handle
<JSObject
*>>& aUnderlyingSource
,
144 const QueuingStrategy
& aStrategy
, ErrorResult
& aRv
) {
146 JS::Rooted
<JSObject
*> underlyingSourceObj(
148 aUnderlyingSource
.WasPassed() ? aUnderlyingSource
.Value() : nullptr);
151 RootedDictionary
<UnderlyingSource
> underlyingSourceDict(aGlobal
.Context());
152 if (underlyingSourceObj
) {
153 JS::Rooted
<JS::Value
> objValue(aGlobal
.Context(),
154 JS::ObjectValue(*underlyingSourceObj
));
155 dom::BindingCallContext
callCx(aGlobal
.Context(),
156 "ReadableStream.constructor");
157 aRv
.MightThrowJSException();
158 if (!underlyingSourceDict
.Init(callCx
, objValue
)) {
159 aRv
.StealExceptionFromJSContext(aGlobal
.Context());
165 RefPtr
<ReadableStream
> readableStream
= new ReadableStream(aGlobal
);
168 if (underlyingSourceDict
.mType
.WasPassed()) {
169 // Implicit assertion on above check.
170 MOZ_ASSERT(underlyingSourceDict
.mType
.Value() == ReadableStreamType::Bytes
);
173 if (aStrategy
.mSize
.WasPassed()) {
174 aRv
.ThrowRangeError("Implementation preserved member 'size'");
179 double highWaterMark
= ExtractHighWaterMark(aStrategy
, 0, aRv
);
185 SetUpReadableByteStreamControllerFromUnderlyingSource(
186 aGlobal
.Context(), readableStream
, underlyingSourceObj
,
187 underlyingSourceDict
, highWaterMark
, aRv
);
192 return readableStream
.forget();
195 // Step 5.1 (implicit in above check)
196 // Step 5.2. Extract callback.
198 // Implementation Note: The specification demands that if the size doesn't
199 // exist, we instead would provide an algorithm that returns 1. Instead, we
200 // will teach callers that a missing callback should simply return 1, rather
201 // than gin up a fake callback here.
203 // This decision may need to be revisited if the default action ever diverges
204 // within the specification.
205 RefPtr
<QueuingStrategySize
> sizeAlgorithm
=
206 aStrategy
.mSize
.WasPassed() ? &aStrategy
.mSize
.Value() : nullptr;
209 double highWaterMark
= ExtractHighWaterMark(aStrategy
, 1, aRv
);
215 SetupReadableStreamDefaultControllerFromUnderlyingSource(
216 aGlobal
.Context(), readableStream
, underlyingSourceObj
,
217 underlyingSourceDict
, highWaterMark
, sizeAlgorithm
, aRv
);
222 return readableStream
.forget();
225 // Dealing with const this ptr is a pain, so just re-implement.
226 // https://streams.spec.whatwg.org/#is-readable-stream-locked
227 bool ReadableStream::Locked() const {
232 // https://streams.spec.whatwg.org/#initialize-readable-stream
233 static void InitializeReadableStream(ReadableStream
* aStream
) {
235 aStream
->SetState(ReadableStream::ReaderState::Readable
);
238 aStream
->SetReader(nullptr);
239 aStream
->SetStoredError(JS::UndefinedHandleValue
);
242 aStream
->SetDisturbed(false);
245 // https://streams.spec.whatwg.org/#create-readable-stream
247 already_AddRefed
<ReadableStream
> CreateReadableStream(
248 JSContext
* aCx
, nsIGlobalObject
* aGlobal
,
249 UnderlyingSourceAlgorithmsBase
* aAlgorithms
,
250 mozilla::Maybe
<double> aHighWaterMark
, QueuingStrategySize
* aSizeAlgorithm
,
252 // Step 1. If highWaterMark was not passed, set it to 1.
253 double highWaterMark
= aHighWaterMark
.valueOr(1.0);
255 // Step 2. consumers of sizeAlgorithm
256 // handle null algorithms correctly.
258 MOZ_ASSERT(IsNonNegativeNumber(highWaterMark
));
260 RefPtr
<ReadableStream
> stream
= new ReadableStream(aGlobal
);
263 InitializeReadableStream(stream
);
266 RefPtr
<ReadableStreamDefaultController
> controller
=
267 new ReadableStreamDefaultController(aGlobal
);
270 SetUpReadableStreamDefaultController(aCx
, stream
, controller
, aAlgorithms
,
271 highWaterMark
, aSizeAlgorithm
, aRv
);
274 return stream
.forget();
277 // https://streams.spec.whatwg.org/#readable-stream-close
278 void ReadableStreamClose(JSContext
* aCx
, ReadableStream
* aStream
,
281 MOZ_ASSERT(aStream
->State() == ReadableStream::ReaderState::Readable
);
284 aStream
->SetState(ReadableStream::ReaderState::Closed
);
287 ReadableStreamGenericReader
* reader
= aStream
->GetReader();
295 reader
->ClosedPromise()->MaybeResolveWithUndefined();
298 if (reader
->IsDefault()) {
299 // Step 6.1. Let readRequests be reader.[[readRequests]].
300 // Move LinkedList out of DefaultReader onto stack to avoid the potential
301 // for concurrent modification, which could invalidate the iterator.
303 // See https://bugs.chromium.org/p/chromium/issues/detail?id=1045874 as an
304 // example of the kind of issue that could occur.
305 LinkedList
<RefPtr
<ReadRequest
>> readRequests
=
306 std::move(reader
->AsDefault()->ReadRequests());
308 // Step 6.2. Set reader.[[readRequests]] to an empty list.
309 // Note: The std::move already cleared this anyway.
310 reader
->AsDefault()->ReadRequests().clear();
312 // Step 6.3. For each readRequest of readRequests,
313 // Drain the local list and destroy elements along the way.
314 while (RefPtr
<ReadRequest
> readRequest
= readRequests
.popFirst()) {
315 // Step 6.3.1. Perform readRequest’s close steps.
316 readRequest
->CloseSteps(aCx
, aRv
);
324 // https://streams.spec.whatwg.org/#readable-stream-cancel
325 already_AddRefed
<Promise
> ReadableStreamCancel(JSContext
* aCx
,
326 ReadableStream
* aStream
,
327 JS::Handle
<JS::Value
> aError
,
330 aStream
->SetDisturbed(true);
333 if (aStream
->State() == ReadableStream::ReaderState::Closed
) {
334 RefPtr
<Promise
> promise
=
335 Promise::CreateInfallible(aStream
->GetParentObject());
336 promise
->MaybeResolveWithUndefined();
337 return promise
.forget();
341 if (aStream
->State() == ReadableStream::ReaderState::Errored
) {
342 JS::Rooted
<JS::Value
> storedError(aCx
, aStream
->StoredError());
343 return Promise::CreateRejected(aStream
->GetParentObject(), storedError
,
348 ReadableStreamClose(aCx
, aStream
, aRv
);
354 ReadableStreamGenericReader
* reader
= aStream
->GetReader();
357 if (reader
&& reader
->IsBYOB()) {
358 // Step 6.1. Let readIntoRequests be reader.[[readIntoRequests]].
359 LinkedList
<RefPtr
<ReadIntoRequest
>> readIntoRequests
=
360 std::move(reader
->AsBYOB()->ReadIntoRequests());
362 // Step 6.2. Set reader.[[readIntoRequests]] to an empty list.
363 // Note: The std::move already cleared this anyway.
364 reader
->AsBYOB()->ReadIntoRequests().clear();
366 // Step 6.3. For each readIntoRequest of readIntoRequests,
367 while (RefPtr
<ReadIntoRequest
> readIntoRequest
=
368 readIntoRequests
.popFirst()) {
369 // Step 6.3.1.Perform readIntoRequest’s close steps, given undefined.
370 readIntoRequest
->CloseSteps(aCx
, JS::UndefinedHandleValue
, aRv
);
378 RefPtr
<ReadableStreamController
> controller(aStream
->Controller());
379 RefPtr
<Promise
> sourceCancelPromise
=
380 controller
->CancelSteps(aCx
, aError
, aRv
);
386 RefPtr
<Promise
> promise
=
387 Promise::CreateInfallible(sourceCancelPromise
->GetParentObject());
389 // ThenWithCycleCollectedArgs will carry promise, keeping it alive until the
390 // callback executes.
391 Result
<RefPtr
<Promise
>, nsresult
> returnResult
=
392 sourceCancelPromise
->ThenWithCycleCollectedArgs(
393 [](JSContext
*, JS::Handle
<JS::Value
>, ErrorResult
&,
394 RefPtr
<Promise
> newPromise
) {
395 newPromise
->MaybeResolveWithUndefined();
396 return newPromise
.forget();
400 if (returnResult
.isErr()) {
401 aRv
.Throw(returnResult
.unwrapErr());
405 return returnResult
.unwrap().forget();
408 // https://streams.spec.whatwg.org/#rs-cancel
409 already_AddRefed
<Promise
> ReadableStream::Cancel(JSContext
* aCx
,
410 JS::Handle
<JS::Value
> aReason
,
412 // Step 1. If ! IsReadableStreamLocked(this) is true,
413 // return a promise rejected with a TypeError exception.
415 aRv
.ThrowTypeError("Cannot cancel a stream locked by a reader.");
419 // Step 2. Return ! ReadableStreamCancel(this, reason).
420 RefPtr
<ReadableStream
> thisRefPtr
= this;
421 return ReadableStreamCancel(aCx
, thisRefPtr
, aReason
, aRv
);
424 // https://streams.spec.whatwg.org/#acquire-readable-stream-reader
425 already_AddRefed
<ReadableStreamDefaultReader
>
426 AcquireReadableStreamDefaultReader(ReadableStream
* aStream
, ErrorResult
& aRv
) {
428 RefPtr
<ReadableStreamDefaultReader
> reader
=
429 new ReadableStreamDefaultReader(aStream
->GetParentObject());
432 SetUpReadableStreamDefaultReader(reader
, aStream
, aRv
);
438 return reader
.forget();
441 // https://streams.spec.whatwg.org/#rs-get-reader
442 void ReadableStream::GetReader(const ReadableStreamGetReaderOptions
& aOptions
,
443 OwningReadableStreamReader
& resultReader
,
445 // Step 1. If options["mode"] does not exist,
446 // return ? AcquireReadableStreamDefaultReader(this).
447 if (!aOptions
.mMode
.WasPassed()) {
448 RefPtr
<ReadableStreamDefaultReader
> defaultReader
=
449 AcquireReadableStreamDefaultReader(this, aRv
);
453 resultReader
.SetAsReadableStreamDefaultReader() = defaultReader
;
457 // Step 2. Assert: options["mode"] is "byob".
458 MOZ_ASSERT(aOptions
.mMode
.Value() == ReadableStreamReaderMode::Byob
);
460 // Step 3. Return ? AcquireReadableStreamBYOBReader(this).
461 RefPtr
<ReadableStreamBYOBReader
> byobReader
=
462 AcquireReadableStreamBYOBReader(this, aRv
);
466 resultReader
.SetAsReadableStreamBYOBReader() = byobReader
;
469 // https://streams.spec.whatwg.org/#is-readable-stream-locked
470 bool IsReadableStreamLocked(ReadableStream
* aStream
) {
472 return aStream
->Locked();
475 // https://streams.spec.whatwg.org/#rs-pipe-through
476 MOZ_CAN_RUN_SCRIPT already_AddRefed
<ReadableStream
> ReadableStream::PipeThrough(
477 const ReadableWritablePair
& aTransform
, const StreamPipeOptions
& aOptions
,
479 // Step 1: If ! IsReadableStreamLocked(this) is true, throw a TypeError
481 if (IsReadableStreamLocked(this)) {
482 aRv
.ThrowTypeError("Cannot pipe from a locked stream.");
486 // Step 2: If ! IsWritableStreamLocked(transform["writable"]) is true, throw a
487 // TypeError exception.
488 if (IsWritableStreamLocked(aTransform
.mWritable
)) {
489 aRv
.ThrowTypeError("Cannot pipe to a locked stream.");
493 // Step 3: Let signal be options["signal"] if it exists, or undefined
495 RefPtr
<AbortSignal
> signal
=
496 aOptions
.mSignal
.WasPassed() ? &aOptions
.mSignal
.Value() : nullptr;
498 // Step 4: Let promise be ! ReadableStreamPipeTo(this, transform["writable"],
499 // options["preventClose"], options["preventAbort"], options["preventCancel"],
501 RefPtr
<WritableStream
> writable
= aTransform
.mWritable
;
502 RefPtr
<Promise
> promise
= ReadableStreamPipeTo(
503 this, writable
, aOptions
.mPreventClose
, aOptions
.mPreventAbort
,
504 aOptions
.mPreventCancel
, signal
, aRv
);
509 // Step 5: Set promise.[[PromiseIsHandled]] to true.
510 MOZ_ALWAYS_TRUE(promise
->SetAnyPromiseIsHandled());
512 // Step 6: Return transform["readable"].
513 return do_AddRef(aTransform
.mReadable
.get());
516 // https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests
517 double ReadableStreamGetNumReadRequests(ReadableStream
* aStream
) {
519 MOZ_ASSERT(ReadableStreamHasDefaultReader(aStream
));
522 return double(aStream
->GetDefaultReader()->ReadRequests().length());
525 // https://streams.spec.whatwg.org/#readable-stream-error
526 void ReadableStreamError(JSContext
* aCx
, ReadableStream
* aStream
,
527 JS::Handle
<JS::Value
> aValue
, ErrorResult
& aRv
) {
529 MOZ_ASSERT(aStream
->State() == ReadableStream::ReaderState::Readable
);
532 aStream
->SetState(ReadableStream::ReaderState::Errored
);
535 aStream
->SetStoredError(aValue
);
538 ReadableStreamGenericReader
* reader
= aStream
->GetReader();
546 reader
->ClosedPromise()->MaybeReject(aValue
);
549 reader
->ClosedPromise()->SetSettledPromiseIsHandled();
552 if (reader
->IsDefault()) {
553 // Step 8.1. Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader,
555 RefPtr
<ReadableStreamDefaultReader
> defaultReader
= reader
->AsDefault();
556 ReadableStreamDefaultReaderErrorReadRequests(aCx
, defaultReader
, aValue
,
562 // Step 9. Otherwise,
563 // Step 9.1. Assert: reader implements ReadableStreamBYOBReader.
564 MOZ_ASSERT(reader
->IsBYOB());
566 // Step 9.2. Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader,
568 RefPtr
<ReadableStreamBYOBReader
> byobReader
= reader
->AsBYOB();
569 ReadableStreamBYOBReaderErrorReadIntoRequests(aCx
, byobReader
, aValue
, aRv
);
576 // https://streams.spec.whatwg.org/#rs-default-controller-close
577 void ReadableStreamFulfillReadRequest(JSContext
* aCx
, ReadableStream
* aStream
,
578 JS::Handle
<JS::Value
> aChunk
, bool aDone
,
581 MOZ_ASSERT(ReadableStreamHasDefaultReader(aStream
));
584 ReadableStreamDefaultReader
* reader
= aStream
->GetDefaultReader();
587 MOZ_ASSERT(!reader
->ReadRequests().isEmpty());
590 RefPtr
<ReadRequest
> readRequest
= reader
->ReadRequests().popFirst();
594 readRequest
->CloseSteps(aCx
, aRv
);
601 readRequest
->ChunkSteps(aCx
, aChunk
, aRv
);
604 // https://streams.spec.whatwg.org/#readable-stream-add-read-request
605 void ReadableStreamAddReadRequest(ReadableStream
* aStream
,
606 ReadRequest
* aReadRequest
) {
608 MOZ_ASSERT(aStream
->GetReader()->IsDefault());
610 MOZ_ASSERT(aStream
->State() == ReadableStream::ReaderState::Readable
);
612 aStream
->GetDefaultReader()->ReadRequests().insertBack(aReadRequest
);
615 // https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee
617 MOZ_CAN_RUN_SCRIPT already_AddRefed
<Promise
>
618 ReadableStreamDefaultTeeSourceAlgorithms::CancelCallback(
619 JSContext
* aCx
, const Optional
<JS::Handle
<JS::Value
>>& aReason
,
622 mTeeState
->SetCanceled(mBranch
, true);
625 mTeeState
->SetReason(mBranch
, aReason
.Value());
629 if (mTeeState
->Canceled(OtherTeeBranch(mBranch
))) {
632 JS::Rooted
<JSObject
*> compositeReason(aCx
, JS::NewArrayObject(aCx
, 2));
633 if (!compositeReason
) {
634 aRv
.StealExceptionFromJSContext(aCx
);
638 JS::Rooted
<JS::Value
> reason1(aCx
, mTeeState
->Reason1());
639 if (!JS_SetElement(aCx
, compositeReason
, 0, reason1
)) {
640 aRv
.StealExceptionFromJSContext(aCx
);
644 JS::Rooted
<JS::Value
> reason2(aCx
, mTeeState
->Reason2());
645 if (!JS_SetElement(aCx
, compositeReason
, 1, reason2
)) {
646 aRv
.StealExceptionFromJSContext(aCx
);
651 JS::Rooted
<JS::Value
> compositeReasonValue(
652 aCx
, JS::ObjectValue(*compositeReason
));
653 RefPtr
<ReadableStream
> stream(mTeeState
->GetStream());
654 RefPtr
<Promise
> cancelResult
=
655 ReadableStreamCancel(aCx
, stream
, compositeReasonValue
, aRv
);
661 mTeeState
->CancelPromise()->MaybeResolve(cancelResult
);
665 return do_AddRef(mTeeState
->CancelPromise());
668 // https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee
670 static void ReadableStreamDefaultTee(JSContext
* aCx
, ReadableStream
* aStream
,
671 bool aCloneForBranch2
,
672 nsTArray
<RefPtr
<ReadableStream
>>& aResult
,
677 // Steps 3-12 are contained in the construction of Tee State.
678 RefPtr
<TeeState
> teeState
= TeeState::Create(aStream
, aCloneForBranch2
, aRv
);
684 auto branch1Algorithms
= MakeRefPtr
<ReadableStreamDefaultTeeSourceAlgorithms
>(
685 teeState
, TeeBranch::Branch1
);
686 auto branch2Algorithms
= MakeRefPtr
<ReadableStreamDefaultTeeSourceAlgorithms
>(
687 teeState
, TeeBranch::Branch2
);
690 nsCOMPtr
<nsIGlobalObject
> global(
691 do_AddRef(teeState
->GetStream()->GetParentObject()));
692 teeState
->SetBranch1(CreateReadableStream(aCx
, global
, branch1Algorithms
,
693 mozilla::Nothing(), nullptr, aRv
));
699 teeState
->SetBranch2(CreateReadableStream(aCx
, global
, branch2Algorithms
,
700 mozilla::Nothing(), nullptr, aRv
));
706 teeState
->GetReader()->ClosedPromise()->AddCallbacksWithCycleCollectedArgs(
707 [](JSContext
* aCx
, JS::Handle
<JS::Value
> aValue
, ErrorResult
& aRv
,
708 TeeState
* aTeeState
) {},
709 [](JSContext
* aCx
, JS::Handle
<JS::Value
> aReason
, ErrorResult
& aRv
,
710 TeeState
* aTeeState
) {
712 ReadableStreamDefaultControllerError(
713 aCx
, aTeeState
->Branch1()->DefaultController(), aReason
, aRv
);
719 ReadableStreamDefaultControllerError(
720 aCx
, aTeeState
->Branch2()->DefaultController(), aReason
, aRv
);
726 if (!aTeeState
->Canceled1() || !aTeeState
->Canceled2()) {
727 aTeeState
->CancelPromise()->MaybeResolveWithUndefined();
733 aResult
.AppendElement(teeState
->Branch1());
734 aResult
.AppendElement(teeState
->Branch2());
737 // https://streams.spec.whatwg.org/#rs-pipe-to
738 already_AddRefed
<Promise
> ReadableStream::PipeTo(
739 WritableStream
& aDestination
, const StreamPipeOptions
& aOptions
,
741 // Step 1. If !IsReadableStreamLocked(this) is true, return a promise rejected
742 // with a TypeError exception.
743 if (IsReadableStreamLocked(this)) {
744 aRv
.ThrowTypeError("Cannot pipe from a locked stream.");
748 // Step 2. If !IsWritableStreamLocked(destination) is true, return a promise
749 // rejected with a TypeError exception.
750 if (IsWritableStreamLocked(&aDestination
)) {
751 aRv
.ThrowTypeError("Cannot pipe to a locked stream.");
755 // Step 3. Let signal be options["signal"] if it exists, or undefined
757 RefPtr
<AbortSignal
> signal
=
758 aOptions
.mSignal
.WasPassed() ? &aOptions
.mSignal
.Value() : nullptr;
760 // Step 4. Return ! ReadableStreamPipeTo(this, destination,
761 // options["preventClose"], options["preventAbort"], options["preventCancel"],
763 return ReadableStreamPipeTo(this, &aDestination
, aOptions
.mPreventClose
,
764 aOptions
.mPreventAbort
, aOptions
.mPreventCancel
,
768 // https://streams.spec.whatwg.org/#readable-stream-tee
770 static void ReadableStreamTee(JSContext
* aCx
, ReadableStream
* aStream
,
771 bool aCloneForBranch2
,
772 nsTArray
<RefPtr
<ReadableStream
>>& aResult
,
777 if (aStream
->Controller()->IsByte()) {
778 ReadableByteStreamTee(aCx
, aStream
, aResult
, aRv
);
782 ReadableStreamDefaultTee(aCx
, aStream
, aCloneForBranch2
, aResult
, aRv
);
785 void ReadableStream::Tee(JSContext
* aCx
,
786 nsTArray
<RefPtr
<ReadableStream
>>& aResult
,
788 ReadableStreamTee(aCx
, this, false, aResult
, aRv
);
791 void ReadableStream::IteratorData::Traverse(
792 nsCycleCollectionTraversalCallback
& cb
) {
793 ReadableStream::IteratorData
* tmp
= this;
794 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReader
);
796 void ReadableStream::IteratorData::Unlink() {
797 ReadableStream::IteratorData
* tmp
= this;
798 NS_IMPL_CYCLE_COLLECTION_UNLINK(mReader
);
801 // https://streams.spec.whatwg.org/#rs-get-iterator
802 void ReadableStream::InitAsyncIteratorData(
803 IteratorData
& aData
, Iterator::IteratorType aType
,
804 const ReadableStreamIteratorOptions
& aOptions
, ErrorResult
& aRv
) {
805 // Step 1. Let reader be ? AcquireReadableStreamDefaultReader(stream).
806 RefPtr
<ReadableStreamDefaultReader
> reader
=
807 AcquireReadableStreamDefaultReader(this, aRv
);
812 // Step 2. Set iterator’s reader to reader.
813 aData
.mReader
= reader
;
815 // Step 3. Let preventCancel be args[0]["preventCancel"].
816 // Step 4. Set iterator’s prevent cancel to preventCancel.
817 aData
.mPreventCancel
= aOptions
.mPreventCancel
;
820 // https://streams.spec.whatwg.org/#rs-asynciterator-prototype-next
822 struct IteratorReadRequest
: public ReadRequest
{
824 NS_DECL_ISUPPORTS_INHERITED
825 NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(IteratorReadRequest
, ReadRequest
)
827 RefPtr
<Promise
> mPromise
;
828 RefPtr
<ReadableStreamDefaultReader
> mReader
;
830 explicit IteratorReadRequest(Promise
* aPromise
,
831 ReadableStreamDefaultReader
* aReader
)
832 : mPromise(aPromise
), mReader(aReader
) {}
834 // chunk steps, given chunk
835 void ChunkSteps(JSContext
* aCx
, JS::Handle
<JS::Value
> aChunk
,
836 ErrorResult
& aRv
) override
{
837 // Step 1. Resolve promise with chunk.
838 mPromise
->MaybeResolve(aChunk
);
842 void CloseSteps(JSContext
* aCx
, ErrorResult
& aRv
) override
{
843 // Step 1. Perform ! ReadableStreamDefaultReaderRelease(reader).
844 ReadableStreamDefaultReaderRelease(aCx
, mReader
, aRv
);
846 mPromise
->MaybeRejectWithUndefined();
850 // Step 2. Resolve promise with end of iteration.
851 iterator_utils::ResolvePromiseForFinished(mPromise
);
854 // error steps, given e
855 void ErrorSteps(JSContext
* aCx
, JS::Handle
<JS::Value
> aError
,
856 ErrorResult
& aRv
) override
{
857 // Step 1. Perform ! ReadableStreamDefaultReaderRelease(reader).
858 ReadableStreamDefaultReaderRelease(aCx
, mReader
, aRv
);
860 mPromise
->MaybeRejectWithUndefined();
864 // Step 2. Reject promise with e.
865 mPromise
->MaybeReject(aError
);
869 virtual ~IteratorReadRequest() = default;
872 NS_IMPL_CYCLE_COLLECTION_INHERITED(IteratorReadRequest
, ReadRequest
, mPromise
,
875 NS_IMPL_ADDREF_INHERITED(IteratorReadRequest
, ReadRequest
)
876 NS_IMPL_RELEASE_INHERITED(IteratorReadRequest
, ReadRequest
)
878 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(IteratorReadRequest
)
879 NS_INTERFACE_MAP_END_INHERITING(ReadRequest
)
881 // https://streams.spec.whatwg.org/#rs-asynciterator-prototype-next
882 already_AddRefed
<Promise
> ReadableStream::GetNextIterationResult(
883 Iterator
* aIterator
, ErrorResult
& aRv
) {
884 // Step 1. Let reader be iterator’s reader.
885 RefPtr
<ReadableStreamDefaultReader
> reader
= aIterator
->Data().mReader
;
887 // Step 2. Assert: reader.[[stream]] is not undefined.
888 MOZ_ASSERT(reader
->GetStream());
890 // Step 3. Let promise be a new promise.
891 RefPtr
<Promise
> promise
= Promise::CreateInfallible(GetParentObject());
893 // Step 4. Let readRequest be a new read request with the following items:
894 RefPtr
<ReadRequest
> request
= new IteratorReadRequest(promise
, reader
);
896 // Step 5. Perform ! ReadableStreamDefaultReaderRead(this, readRequest).
898 if (!jsapi
.Init(mGlobal
)) {
899 aRv
.ThrowUnknownError("Internal error");
903 ReadableStreamDefaultReaderRead(jsapi
.cx(), reader
, request
, aRv
);
908 // Step 6. Return promise.
909 return promise
.forget();
912 // https://streams.spec.whatwg.org/#rs-asynciterator-prototype-return
913 already_AddRefed
<Promise
> ReadableStream::IteratorReturn(
914 JSContext
* aCx
, Iterator
* aIterator
, JS::Handle
<JS::Value
> aValue
,
916 // Step 1. Let reader be iterator’s reader.
917 RefPtr
<ReadableStreamDefaultReader
> reader
= aIterator
->Data().mReader
;
919 // Step 2. Assert: reader.[[stream]] is not undefined.
920 MOZ_ASSERT(reader
->GetStream());
922 // Step 3. Assert: reader.[[readRequests]] is empty, as the async iterator
923 // machinery guarantees that any previous calls to next() have settled before
925 MOZ_ASSERT(reader
->ReadRequests().isEmpty());
927 // Step 4. If iterator’s prevent cancel is false:
928 if (!aIterator
->Data().mPreventCancel
) {
929 // Step 4.1. Let result be ! ReadableStreamReaderGenericCancel(reader, arg).
930 RefPtr
<ReadableStream
> stream(reader
->GetStream());
931 RefPtr
<Promise
> result
= ReadableStreamCancel(aCx
, stream
, aValue
, aRv
);
932 if (NS_WARN_IF(aRv
.Failed())) {
936 // Step 4.2. Perform ! ReadableStreamDefaultReaderRelease(reader).
937 ReadableStreamDefaultReaderRelease(aCx
, reader
, aRv
);
938 if (NS_WARN_IF(aRv
.Failed())) {
942 // Step 4.3. Return result.
943 return result
.forget();
946 // Step 5. Perform ! ReadableStreamDefaultReaderRelease(reader).
947 ReadableStreamDefaultReaderRelease(aCx
, reader
, aRv
);
948 if (NS_WARN_IF(aRv
.Failed())) {
952 // Step 6. Return a promise resolved with undefined.
953 return Promise::CreateResolvedWithUndefined(GetParentObject(), aRv
);
956 // https://streams.spec.whatwg.org/#readable-stream-add-read-into-request
957 void ReadableStreamAddReadIntoRequest(ReadableStream
* aStream
,
958 ReadIntoRequest
* aReadIntoRequest
) {
959 // Step 1. Assert: stream.[[reader]] implements ReadableStreamBYOBReader.
960 MOZ_ASSERT(aStream
->GetReader()->IsBYOB());
962 // Step 2. Assert: stream.[[state]] is "readable" or "closed".
963 MOZ_ASSERT(aStream
->State() == ReadableStream::ReaderState::Readable
||
964 aStream
->State() == ReadableStream::ReaderState::Closed
);
966 // Step 3. Append readRequest to stream.[[reader]].[[readIntoRequests]].
967 aStream
->GetReader()->AsBYOB()->ReadIntoRequests().insertBack(
971 // https://streams.spec.whatwg.org/#abstract-opdef-createreadablebytestream
972 already_AddRefed
<ReadableStream
> CreateReadableByteStream(
973 JSContext
* aCx
, nsIGlobalObject
* aGlobal
,
974 UnderlyingSourceAlgorithmsBase
* aAlgorithms
, ErrorResult
& aRv
) {
975 // Step 1. Let stream be a new ReadableStream.
976 RefPtr
<ReadableStream
> stream
= new ReadableStream(aGlobal
);
978 // Step 2. Perform ! InitializeReadableStream(stream).
979 InitializeReadableStream(stream
);
981 // Step 3. Let controller be a new ReadableByteStreamController.
982 RefPtr
<ReadableByteStreamController
> controller
=
983 new ReadableByteStreamController(aGlobal
);
985 // Step 4. Perform ? SetUpReadableByteStreamController(stream, controller,
986 // startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, undefined).
987 SetUpReadableByteStreamController(aCx
, stream
, controller
, aAlgorithms
, 0,
988 mozilla::Nothing(), aRv
);
994 return stream
.forget();
997 // https://streams.spec.whatwg.org/#readablestream-set-up
998 // (except this instead creates a new ReadableStream rather than accepting an
999 // existing instance)
1000 already_AddRefed
<ReadableStream
> ReadableStream::CreateNative(
1001 JSContext
* aCx
, nsIGlobalObject
* aGlobal
,
1002 UnderlyingSourceAlgorithmsWrapper
& aAlgorithms
,
1003 mozilla::Maybe
<double> aHighWaterMark
, QueuingStrategySize
* aSizeAlgorithm
,
1005 // an optional number highWaterMark (default 1)
1006 double highWaterMark
= aHighWaterMark
.valueOr(1);
1007 // and if given, highWaterMark must be a non-negative, non-NaN number.
1008 MOZ_ASSERT(IsNonNegativeNumber(highWaterMark
));
1010 // Step 1: Let startAlgorithm be an algorithm that returns undefined.
1011 // Step 2: Let pullAlgorithmWrapper be an algorithm that runs these steps:
1012 // Step 3: Let cancelAlgorithmWrapper be an algorithm that runs these steps:
1013 // (Done by UnderlyingSourceAlgorithmsWrapper)
1015 // Step 4: If sizeAlgorithm was not given, then set it to an algorithm that
1016 // returns 1. (Callers will treat nullptr as such, see
1017 // ReadableStream::Constructor for details)
1019 // Step 5: Perform ! InitializeReadableStream(stream).
1020 auto stream
= MakeRefPtr
<ReadableStream
>(aGlobal
);
1022 // Step 6: Let controller be a new ReadableStreamDefaultController.
1023 auto controller
= MakeRefPtr
<ReadableStreamDefaultController
>(aGlobal
);
1025 // Step 7: Perform ! SetUpReadableStreamDefaultController(stream, controller,
1026 // startAlgorithm, pullAlgorithmWrapper, cancelAlgorithmWrapper,
1027 // highWaterMark, sizeAlgorithm).
1028 SetUpReadableStreamDefaultController(aCx
, stream
, controller
, &aAlgorithms
,
1029 highWaterMark
, aSizeAlgorithm
, aRv
);
1033 return stream
.forget();
1036 // https://streams.spec.whatwg.org/#readablestream-set-up-with-byte-reading-support
1037 // (except this instead creates a new ReadableStream rather than accepting an
1038 // existing instance)
1039 already_AddRefed
<ReadableStream
> ReadableStream::CreateByteNative(
1040 JSContext
* aCx
, nsIGlobalObject
* aGlobal
,
1041 UnderlyingSourceAlgorithmsWrapper
& aAlgorithms
,
1042 mozilla::Maybe
<double> aHighWaterMark
, ErrorResult
& aRv
) {
1043 // an optional number highWaterMark (default 0)
1044 double highWaterMark
= aHighWaterMark
.valueOr(0);
1046 // Step 1: Let startAlgorithm be an algorithm that returns undefined.
1047 // Step 2: Let pullAlgorithmWrapper be an algorithm that runs these steps:
1048 // Step 3: Let cancelAlgorithmWrapper be an algorithm that runs these steps:
1049 // (Done by UnderlyingSourceAlgorithmsWrapper)
1051 // Step 4: Perform ! InitializeReadableStream(stream).
1052 auto stream
= MakeRefPtr
<ReadableStream
>(aGlobal
);
1054 // Step 5: Let controller be a new ReadableByteStreamController.
1055 auto controller
= MakeRefPtr
<ReadableByteStreamController
>(aGlobal
);
1057 // Step 6: Perform ! SetUpReadableByteStreamController(stream, controller,
1058 // startAlgorithm, pullAlgorithmWrapper, cancelAlgorithmWrapper,
1059 // highWaterMark, undefined).
1060 SetUpReadableByteStreamController(aCx
, stream
, controller
, &aAlgorithms
,
1061 highWaterMark
, Nothing(), aRv
);
1065 return stream
.forget();
1068 // https://streams.spec.whatwg.org/#readablestream-close
1069 void ReadableStream::CloseNative(JSContext
* aCx
, ErrorResult
& aRv
) {
1070 MOZ_ASSERT(mController
->GetAlgorithms()->IsNative());
1072 // Step 1: If stream.[[controller]] implements ReadableByteStreamController,
1073 if (mController
->IsByte()) {
1074 RefPtr
<ReadableByteStreamController
> controller
= mController
->AsByte();
1076 // Step 1.1: Perform !
1077 // ReadableByteStreamControllerClose(stream.[[controller]]).
1078 ReadableByteStreamControllerClose(aCx
, controller
, aRv
);
1083 // Step 1.2: If stream.[[controller]].[[pendingPullIntos]] is not empty,
1084 // perform ! ReadableByteStreamControllerRespond(stream.[[controller]], 0).
1085 if (!controller
->PendingPullIntos().isEmpty()) {
1086 ReadableByteStreamControllerRespond(aCx
, controller
, 0, aRv
);
1091 // Step 2: Otherwise, perform !
1092 // ReadableStreamDefaultControllerClose(stream.[[controller]]).
1093 RefPtr
<ReadableStreamDefaultController
> controller
= mController
->AsDefault();
1094 ReadableStreamDefaultControllerClose(aCx
, controller
, aRv
);
1097 // https://streams.spec.whatwg.org/#readablestream-error
1098 void ReadableStream::ErrorNative(JSContext
* aCx
, JS::Handle
<JS::Value
> aError
,
1100 // Step 1: If stream.[[controller]] implements ReadableByteStreamController,
1101 // then perform ! ReadableByteStreamControllerError(stream.[[controller]], e).
1102 if (mController
->IsByte()) {
1103 ReadableByteStreamControllerError(mController
->AsByte(), aError
, aRv
);
1106 // Step 2: Otherwise, perform !
1107 // ReadableStreamDefaultControllerError(stream.[[controller]], e).
1108 ReadableStreamDefaultControllerError(aCx
, mController
->AsDefault(), aError
,
1112 // https://streams.spec.whatwg.org/#readablestream-current-byob-request-view
1113 static void CurrentBYOBRequestView(JSContext
* aCx
,
1114 ReadableByteStreamController
& aController
,
1115 JS::MutableHandle
<JSObject
*> aRetVal
,
1117 // Step 1. Assert: stream.[[controller]] implements
1118 // ReadableByteStreamController. (implicit)
1120 // Step 2: Let byobRequest be !
1121 // ReadableByteStreamControllerGetBYOBRequest(stream.[[controller]]).
1122 RefPtr
<ReadableStreamBYOBRequest
> byobRequest
=
1123 ReadableByteStreamControllerGetBYOBRequest(aCx
, &aController
, aRv
);
1124 // Step 3: If byobRequest is null, then return null.
1126 aRetVal
.set(nullptr);
1129 // Step 4: Return byobRequest.[[view]].
1130 byobRequest
->GetView(aCx
, aRetVal
);
1133 static bool HasSameBufferView(JSContext
* aCx
, JS::Handle
<JSObject
*> aX
,
1134 JS::Handle
<JSObject
*> aY
, ErrorResult
& aRv
) {
1136 JS::Rooted
<JSObject
*> viewedBufferX(
1137 aCx
, JS_GetArrayBufferViewBuffer(aCx
, aX
, &isShared
));
1138 if (!viewedBufferX
) {
1139 aRv
.StealExceptionFromJSContext(aCx
);
1143 JS::Rooted
<JSObject
*> viewedBufferY(
1144 aCx
, JS_GetArrayBufferViewBuffer(aCx
, aY
, &isShared
));
1145 if (!viewedBufferY
) {
1146 aRv
.StealExceptionFromJSContext(aCx
);
1150 return viewedBufferX
== viewedBufferY
;
1153 // https://streams.spec.whatwg.org/#readablestream-enqueue
1154 void ReadableStream::EnqueueNative(JSContext
* aCx
, JS::Handle
<JS::Value
> aChunk
,
1156 MOZ_ASSERT(mController
->GetAlgorithms()->IsNative());
1158 // Step 1: If stream.[[controller]] implements
1159 // ReadableStreamDefaultController,
1160 if (mController
->IsDefault()) {
1161 // Step 1.1: Perform !
1162 // ReadableStreamDefaultControllerEnqueue(stream.[[controller]], chunk).
1163 RefPtr
<ReadableStreamDefaultController
> controller
=
1164 mController
->AsDefault();
1165 ReadableStreamDefaultControllerEnqueue(aCx
, controller
, aChunk
, aRv
);
1169 // Step 2.1: Assert: stream.[[controller]] implements
1170 // ReadableByteStreamController.
1171 MOZ_ASSERT(mController
->IsByte());
1172 RefPtr
<ReadableByteStreamController
> controller
= mController
->AsByte();
1174 // Step 2.2: Assert: chunk is an ArrayBufferView.
1175 MOZ_ASSERT(aChunk
.isObject() &&
1176 JS_IsArrayBufferViewObject(&aChunk
.toObject()));
1177 JS::Rooted
<JSObject
*> chunk(aCx
, &aChunk
.toObject());
1179 // Step 3: Let byobView be the current BYOB request view for stream.
1180 JS::Rooted
<JSObject
*> byobView(aCx
);
1181 CurrentBYOBRequestView(aCx
, *controller
, &byobView
, aRv
);
1186 // Step 4: If byobView is non-null, and chunk.[[ViewedArrayBuffer]] is
1187 // byobView.[[ViewedArrayBuffer]], then:
1188 if (byobView
&& HasSameBufferView(aCx
, chunk
, byobView
, aRv
)) {
1189 // Step 4.1: Assert: chunk.[[ByteOffset]] is byobView.[[ByteOffset]].
1190 MOZ_ASSERT(JS_GetArrayBufferViewByteOffset(chunk
) ==
1191 JS_GetArrayBufferViewByteOffset(byobView
));
1192 // Step 4.2: Assert: chunk.[[ByteLength]] ≤ byobView.[[ByteLength]].
1193 MOZ_ASSERT(JS_GetArrayBufferViewByteLength(chunk
) ==
1194 JS_GetArrayBufferViewByteLength(byobView
));
1195 // Step 4.3: Perform ?
1196 // ReadableByteStreamControllerRespond(stream.[[controller]],
1197 // chunk.[[ByteLength]]).
1198 ReadableByteStreamControllerRespond(
1199 aCx
, controller
, JS_GetArrayBufferViewByteLength(chunk
), aRv
);
1207 // Step 5: Otherwise, perform ?
1208 // ReadableByteStreamControllerEnqueue(stream.[[controller]], chunk).
1209 ReadableByteStreamControllerEnqueue(aCx
, controller
, chunk
, aRv
);
1212 // https://streams.spec.whatwg.org/#readablestream-get-a-reader
1213 // To get a reader for a ReadableStream stream, return ?
1214 // AcquireReadableStreamDefaultReader(stream). The result will be a
1215 // ReadableStreamDefaultReader.
1216 already_AddRefed
<mozilla::dom::ReadableStreamDefaultReader
>
1217 ReadableStream::GetReader(ErrorResult
& aRv
) {
1218 return AcquireReadableStreamDefaultReader(this, aRv
);
1221 } // namespace mozilla::dom