Bug 1768570 [wpt PR 34013] - Update wpt metadata, a=testonly
[gecko.git] / dom / streams / ReadableStream.cpp
blob9963a493d886b1e676d95a0342049e19fe7a14ed
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim:set ts=2 sw=2 sts=2 et cindent: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/dom/ReadableStream.h"
8 #include "js/Array.h"
9 #include "js/Exception.h"
10 #include "js/PropertyAndElement.h"
11 #include "js/TypeDecls.h"
12 #include "js/Value.h"
13 #include "mozilla/AlreadyAddRefed.h"
14 #include "mozilla/Assertions.h"
15 #include "mozilla/Attributes.h"
16 #include "mozilla/CycleCollectedJSContext.h"
17 #include "mozilla/FloatingPoint.h"
18 #include "mozilla/HoldDropJSObjects.h"
19 #include "mozilla/StaticPrefs_dom.h"
20 #include "mozilla/dom/BindingCallContext.h"
21 #include "mozilla/dom/ByteStreamHelpers.h"
22 #include "mozilla/dom/BodyStream.h"
23 #include "mozilla/dom/QueueWithSizes.h"
24 #include "mozilla/dom/QueuingStrategyBinding.h"
25 #include "mozilla/dom/ReadIntoRequest.h"
26 #include "mozilla/dom/ReadRequest.h"
27 #include "mozilla/dom/ReadableByteStreamController.h"
28 #include "mozilla/dom/ReadableStreamBYOBReader.h"
29 #include "mozilla/dom/ReadableStreamBinding.h"
30 #include "mozilla/dom/ReadableStreamController.h"
31 #include "mozilla/dom/ReadableStreamDefaultController.h"
32 #include "mozilla/dom/ReadableStreamDefaultReader.h"
33 #include "mozilla/dom/ReadableStreamPipeTo.h"
34 #include "mozilla/dom/ReadableStreamTee.h"
35 #include "mozilla/dom/RootedDictionary.h"
36 #include "mozilla/dom/ScriptSettings.h"
37 #include "mozilla/dom/StreamUtils.h"
38 #include "mozilla/dom/TeeState.h"
39 #include "mozilla/dom/UnderlyingSourceBinding.h"
40 #include "mozilla/dom/UnderlyingSourceCallbackHelpers.h"
41 #include "mozilla/dom/WritableStream.h"
42 #include "mozilla/dom/WritableStreamDefaultWriter.h"
43 #include "nsCOMPtr.h"
45 #include "mozilla/dom/Promise-inl.h"
46 #include "nsIGlobalObject.h"
47 #include "nsISupports.h"
49 inline void ImplCycleCollectionTraverse(
50 nsCycleCollectionTraversalCallback& aCallback,
51 mozilla::Variant<mozilla::Nothing,
52 RefPtr<mozilla::dom::ReadableStreamDefaultReader>>&
53 aReader,
54 const char* aName, uint32_t aFlags = 0) {
55 if (aReader.is<RefPtr<mozilla::dom::ReadableStreamDefaultReader>>()) {
56 ImplCycleCollectionTraverse(
57 aCallback,
58 aReader.as<RefPtr<mozilla::dom::ReadableStreamDefaultReader>>(), aName,
59 aFlags);
63 inline void ImplCycleCollectionUnlink(
64 mozilla::Variant<mozilla::Nothing,
65 RefPtr<mozilla::dom::ReadableStreamDefaultReader>>&
66 aReader) {
67 aReader = AsVariant(mozilla::Nothing());
70 namespace mozilla::dom {
72 // Only needed for refcounted objects.
73 NS_IMPL_CYCLE_COLLECTION_WRAPPERCACHE_WITH_JS_MEMBERS(ReadableStream,
74 (mGlobal, mController,
75 mReader, mAlgorithms,
76 mNativeUnderlyingSource),
77 (mStoredError))
79 NS_IMPL_CYCLE_COLLECTING_ADDREF(ReadableStream)
80 NS_IMPL_CYCLE_COLLECTING_RELEASE(ReadableStream)
81 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ReadableStream)
82 NS_WRAPPERCACHE_INTERFACE_MAP_ENTRY
83 NS_INTERFACE_MAP_ENTRY(nsISupports)
84 NS_INTERFACE_MAP_END
86 ReadableStream::ReadableStream(nsIGlobalObject* aGlobal)
87 : mGlobal(aGlobal), mReader(nullptr) {
88 mozilla::HoldJSObjects(this);
91 ReadableStream::ReadableStream(const GlobalObject& aGlobal)
92 : mGlobal(do_QueryInterface(aGlobal.GetAsSupports())), mReader(nullptr) {
93 mozilla::HoldJSObjects(this);
96 ReadableStream::~ReadableStream() { mozilla::DropJSObjects(this); }
98 JSObject* ReadableStream::WrapObject(JSContext* aCx,
99 JS::Handle<JSObject*> aGivenProto) {
100 return ReadableStream_Binding::Wrap(aCx, this, aGivenProto);
103 ReadableStreamDefaultReader* ReadableStream::GetDefaultReader() {
104 return mReader->AsDefault();
107 void ReadableStream::SetReader(ReadableStreamGenericReader* aReader) {
108 mReader = aReader;
111 // https://streams.spec.whatwg.org/#readable-stream-has-byob-reader
112 bool ReadableStreamHasBYOBReader(ReadableStream* aStream) {
113 // Step 1. Let reader be stream.[[reader]].
114 ReadableStreamGenericReader* reader = aStream->GetReader();
116 // Step 2. If reader is undefined, return false.
117 if (!reader) {
118 return false;
121 // Step 3. If reader implements ReadableStreamBYOBReader, return true.
122 // Step 4. Return false.
123 return reader->IsBYOB();
126 // https://streams.spec.whatwg.org/#readable-stream-has-default-reader
127 bool ReadableStreamHasDefaultReader(ReadableStream* aStream) {
128 // Step 1. Let reader be stream.[[reader]].
129 ReadableStreamGenericReader* reader = aStream->GetReader();
131 // Step 2. If reader is undefined, return false.
132 if (!reader) {
133 return false;
136 // Step 3. If reader implements ReadableStreamDefaultReader, return true.
137 // Step 4. Return false.
138 return reader->IsDefault();
141 void ReadableStream::SetNativeUnderlyingSource(
142 BodyStreamHolder* aUnderlyingSource) {
143 mNativeUnderlyingSource = aUnderlyingSource;
146 void ReadableStream::ReleaseObjects() {
147 SetNativeUnderlyingSource(nullptr);
149 SetErrorAlgorithm(nullptr);
151 if (mController->IsByte()) {
152 ReadableByteStreamControllerClearAlgorithms(mController->AsByte());
153 return;
156 MOZ_ASSERT(mController->IsDefault());
157 ReadableStreamDefaultControllerClearAlgorithms(mController->AsDefault());
160 // Streams Spec: 4.2.4: https://streams.spec.whatwg.org/#rs-prototype
161 /* static */
162 already_AddRefed<ReadableStream> ReadableStream::Constructor(
163 const GlobalObject& aGlobal,
164 const Optional<JS::Handle<JSObject*>>& aUnderlyingSource,
165 const QueuingStrategy& aStrategy, ErrorResult& aRv) {
166 // Step 1.
167 JS::RootedObject underlyingSourceObj(
168 aGlobal.Context(),
169 aUnderlyingSource.WasPassed() ? aUnderlyingSource.Value() : nullptr);
171 // Step 2.
172 RootedDictionary<UnderlyingSource> underlyingSourceDict(aGlobal.Context());
173 if (underlyingSourceObj) {
174 JS::RootedValue objValue(aGlobal.Context(),
175 JS::ObjectValue(*underlyingSourceObj));
176 dom::BindingCallContext callCx(aGlobal.Context(),
177 "ReadableStream.constructor");
178 aRv.MightThrowJSException();
179 if (!underlyingSourceDict.Init(callCx, objValue)) {
180 aRv.StealExceptionFromJSContext(aGlobal.Context());
181 return nullptr;
185 // Step 3.
186 RefPtr<ReadableStream> readableStream = new ReadableStream(aGlobal);
188 // Step 4.
189 if (underlyingSourceDict.mType.WasPassed()) {
190 // Implicit assertion on above check.
191 MOZ_ASSERT(underlyingSourceDict.mType.Value() == ReadableStreamType::Bytes);
193 // Step 4.1
194 if (aStrategy.mSize.WasPassed()) {
195 aRv.ThrowRangeError("Implementation preserved member 'size'");
196 return nullptr;
199 // Step 4.2
200 double highWaterMark = ExtractHighWaterMark(aStrategy, 0, aRv);
201 if (aRv.Failed()) {
202 return nullptr;
205 // Step 4.3
206 if (!StaticPrefs::dom_streams_byte_streams_enabled()) {
207 aRv.ThrowNotSupportedError("BYOB byte streams not yet supported.");
208 return nullptr;
211 SetUpReadableByteStreamControllerFromUnderlyingSource(
212 aGlobal.Context(), readableStream, underlyingSourceObj,
213 underlyingSourceDict, highWaterMark, aRv);
214 if (aRv.Failed()) {
215 return nullptr;
218 return readableStream.forget();
221 // Step 5.1 (implicit in above check)
222 // Step 5.2. Extract callback.
224 // Implementation Note: The specification demands that if the size doesn't
225 // exist, we instead would provide an algorithm that returns 1. Instead, we
226 // will teach callers that a missing callback should simply return 1, rather
227 // than gin up a fake callback here.
229 // This decision may need to be revisited if the default action ever diverges
230 // within the specification.
231 RefPtr<QueuingStrategySize> sizeAlgorithm =
232 aStrategy.mSize.WasPassed() ? &aStrategy.mSize.Value() : nullptr;
234 // Step 5.3
235 double highWaterMark = ExtractHighWaterMark(aStrategy, 1, aRv);
236 if (aRv.Failed()) {
237 return nullptr;
240 // Step 5.4.
241 SetupReadableStreamDefaultControllerFromUnderlyingSource(
242 aGlobal.Context(), readableStream, underlyingSourceObj,
243 underlyingSourceDict, highWaterMark, sizeAlgorithm, aRv);
244 if (aRv.Failed()) {
245 return nullptr;
248 return readableStream.forget();
251 // Dealing with const this ptr is a pain, so just re-implement.
252 // https://streams.spec.whatwg.org/#is-readable-stream-locked
253 bool ReadableStream::Locked() const {
254 // Step 1 + 2.
255 return mReader;
258 // https://streams.spec.whatwg.org/#initialize-readable-stream
259 static void InitializeReadableStream(ReadableStream* aStream) {
260 // Step 1.
261 aStream->SetState(ReadableStream::ReaderState::Readable);
263 // Step 2.
264 aStream->SetReader(nullptr);
265 aStream->SetStoredError(JS::UndefinedHandleValue);
267 // Step 3.
268 aStream->SetDisturbed(false);
271 // https://streams.spec.whatwg.org/#create-readable-stream
272 MOZ_CAN_RUN_SCRIPT
273 already_AddRefed<ReadableStream> CreateReadableStream(
274 JSContext* aCx, nsIGlobalObject* aGlobal,
275 UnderlyingSourceAlgorithmsBase* aAlgorithms,
276 mozilla::Maybe<double> aHighWaterMark, QueuingStrategySize* aSizeAlgorithm,
277 ErrorResult& aRv) {
278 // Step 1.
279 double highWaterMark = aHighWaterMark.isSome() ? *aHighWaterMark : 1.0;
281 // Step 2. consumers of sizeAlgorithm
282 // handle null algorithms correctly.
283 // Step 3.
284 MOZ_ASSERT(IsNonNegativeNumber(highWaterMark));
285 // Step 4.
286 RefPtr<ReadableStream> stream = new ReadableStream(aGlobal);
288 // Step 5.
289 InitializeReadableStream(stream);
291 // Step 6.
292 RefPtr<ReadableStreamDefaultController> controller =
293 new ReadableStreamDefaultController(aGlobal);
295 // Step 7.
296 SetUpReadableStreamDefaultController(aCx, stream, controller, aAlgorithms,
297 highWaterMark, aSizeAlgorithm, aRv);
299 // Step 8.
300 return stream.forget();
303 // https://streams.spec.whatwg.org/#readable-stream-close
304 void ReadableStreamClose(JSContext* aCx, ReadableStream* aStream,
305 ErrorResult& aRv) {
306 // Step 1.
307 MOZ_ASSERT(aStream->State() == ReadableStream::ReaderState::Readable);
309 // Step 2.
310 aStream->SetState(ReadableStream::ReaderState::Closed);
312 // Step 3.
313 ReadableStreamGenericReader* reader = aStream->GetReader();
315 // Step 4.
316 if (!reader) {
317 return;
320 // Step 5.
321 reader->ClosedPromise()->MaybeResolveWithUndefined();
323 // Step 6.
324 if (reader->IsDefault()) {
325 // Step 6.1. Let readRequests be reader.[[readRequests]].
326 // Move LinkedList out of DefaultReader onto stack to avoid the potential
327 // for concurrent modification, which could invalidate the iterator.
329 // See https://bugs.chromium.org/p/chromium/issues/detail?id=1045874 as an
330 // example of the kind of issue that could occur.
331 LinkedList<RefPtr<ReadRequest>> readRequests =
332 std::move(reader->AsDefault()->ReadRequests());
334 // Step 6.2. Set reader.[[readRequests]] to an empty list.
335 // Note: The std::move already cleared this anyway.
336 reader->AsDefault()->ReadRequests().clear();
338 // Step 6.3. For each readRequest of readRequests,
339 // Drain the local list and destroy elements along the way.
340 while (RefPtr<ReadRequest> readRequest = readRequests.popFirst()) {
341 // Step 6.3.1. Perform readRequest’s close steps.
342 readRequest->CloseSteps(aCx, aRv);
343 if (aRv.Failed()) {
344 return;
350 // https://streams.spec.whatwg.org/#readable-stream-cancel
351 already_AddRefed<Promise> ReadableStreamCancel(JSContext* aCx,
352 ReadableStream* aStream,
353 JS::Handle<JS::Value> aError,
354 ErrorResult& aRv) {
355 // Step 1.
356 aStream->SetDisturbed(true);
358 // Step 2.
359 if (aStream->State() == ReadableStream::ReaderState::Closed) {
360 RefPtr<Promise> promise = Promise::Create(aStream->GetParentObject(), aRv);
361 if (aRv.Failed()) {
362 return nullptr;
365 promise->MaybeResolveWithUndefined();
366 return promise.forget();
369 // Step 3.
370 if (aStream->State() == ReadableStream::ReaderState::Errored) {
371 RefPtr<Promise> promise = Promise::Create(aStream->GetParentObject(), aRv);
372 if (aRv.Failed()) {
373 return nullptr;
375 JS::RootedValue storedError(aCx, aStream->StoredError());
376 promise->MaybeReject(storedError);
377 return promise.forget();
380 // Step 4.
381 ReadableStreamClose(aCx, aStream, aRv);
382 if (aRv.Failed()) {
383 return nullptr;
386 // Step 5.
387 ReadableStreamGenericReader* reader = aStream->GetReader();
389 // Step 6.
390 if (reader && reader->IsBYOB()) {
391 // Step 6.1. Let readIntoRequests be reader.[[readIntoRequests]].
392 LinkedList<RefPtr<ReadIntoRequest>> readIntoRequests =
393 std::move(reader->AsBYOB()->ReadIntoRequests());
395 // Step 6.2. Set reader.[[readIntoRequests]] to an empty list.
396 // Note: The std::move already cleared this anyway.
397 reader->AsBYOB()->ReadIntoRequests().clear();
399 // Step 6.3. For each readIntoRequest of readIntoRequests,
400 while (RefPtr<ReadIntoRequest> readIntoRequest =
401 readIntoRequests.popFirst()) {
402 // Step 6.3.1.Perform readIntoRequest’s close steps, given undefined.
403 readIntoRequest->CloseSteps(aCx, JS::UndefinedHandleValue, aRv);
404 if (aRv.Failed()) {
405 return nullptr;
410 // Step 7.
411 RefPtr<ReadableStreamController> controller(aStream->Controller());
412 RefPtr<Promise> sourceCancelPromise =
413 controller->CancelSteps(aCx, aError, aRv);
414 if (aRv.Failed()) {
415 return nullptr;
418 // Step 8.
419 RefPtr<Promise> promise =
420 Promise::Create(sourceCancelPromise->GetParentObject(), aRv);
421 if (aRv.Failed()) {
422 return nullptr;
425 // ThenWithCycleCollectedArgs will carry promise, keeping it alive until the
426 // callback executes.
427 Result<RefPtr<Promise>, nsresult> returnResult =
428 sourceCancelPromise->ThenWithCycleCollectedArgs(
429 [](JSContext*, JS::HandleValue, ErrorResult&,
430 RefPtr<Promise> newPromise) {
431 newPromise->MaybeResolveWithUndefined();
432 return newPromise.forget();
434 promise);
436 if (returnResult.isErr()) {
437 aRv.Throw(returnResult.unwrapErr());
438 return nullptr;
441 return returnResult.unwrap().forget();
444 // https://streams.spec.whatwg.org/#rs-cancel
445 already_AddRefed<Promise> ReadableStream::Cancel(JSContext* aCx,
446 JS::Handle<JS::Value> aReason,
447 ErrorResult& aRv) {
448 // Step 1. If ! IsReadableStreamLocked(this) is true,
449 // return a promise rejected with a TypeError exception.
450 if (Locked()) {
451 aRv.ThrowTypeError("Cannot cancel a stream locked by a reader.");
452 return nullptr;
455 // Step 2. Return ! ReadableStreamCancel(this, reason).
456 RefPtr<ReadableStream> thisRefPtr = this;
457 return ReadableStreamCancel(aCx, thisRefPtr, aReason, aRv);
460 // https://streams.spec.whatwg.org/#acquire-readable-stream-reader
461 already_AddRefed<ReadableStreamDefaultReader>
462 AcquireReadableStreamDefaultReader(ReadableStream* aStream, ErrorResult& aRv) {
463 // Step 1.
464 RefPtr<ReadableStreamDefaultReader> reader =
465 new ReadableStreamDefaultReader(aStream->GetParentObject());
467 // Step 2.
468 SetUpReadableStreamDefaultReader(reader, aStream, aRv);
469 if (aRv.Failed()) {
470 return nullptr;
473 // Step 3.
474 return reader.forget();
477 // https://streams.spec.whatwg.org/#rs-get-reader
478 void ReadableStream::GetReader(const ReadableStreamGetReaderOptions& aOptions,
479 OwningReadableStreamReader& resultReader,
480 ErrorResult& aRv) {
481 // Step 1. If options["mode"] does not exist,
482 // return ? AcquireReadableStreamDefaultReader(this).
483 if (!aOptions.mMode.WasPassed()) {
484 RefPtr<ReadableStream> thisRefPtr = this;
485 RefPtr<ReadableStreamDefaultReader> defaultReader =
486 AcquireReadableStreamDefaultReader(thisRefPtr, aRv);
487 if (aRv.Failed()) {
488 return;
490 resultReader.SetAsReadableStreamDefaultReader() = defaultReader;
491 return;
494 // Step 2. Assert: options["mode"] is "byob".
495 MOZ_ASSERT(aOptions.mMode.Value() == ReadableStreamReaderMode::Byob);
497 // Step 3. Return ? AcquireReadableStreamBYOBReader(this).
498 if (!StaticPrefs::dom_streams_byte_streams_enabled()) {
499 aRv.ThrowTypeError("BYOB byte streams reader not yet supported.");
500 return;
503 RefPtr<ReadableStream> thisRefPtr = this;
504 RefPtr<ReadableStreamBYOBReader> byobReader =
505 AcquireReadableStreamBYOBReader(thisRefPtr, aRv);
506 if (aRv.Failed()) {
507 return;
509 resultReader.SetAsReadableStreamBYOBReader() = byobReader;
512 // https://streams.spec.whatwg.org/#is-readable-stream-locked
513 bool IsReadableStreamLocked(ReadableStream* aStream) {
514 // Step 1 + 2.
515 return aStream->Locked();
518 // https://streams.spec.whatwg.org/#rs-pipe-through
519 MOZ_CAN_RUN_SCRIPT already_AddRefed<ReadableStream> ReadableStream::PipeThrough(
520 const ReadableWritablePair& aTransform, const StreamPipeOptions& aOptions,
521 ErrorResult& aRv) {
522 // Step 1: If ! IsReadableStreamLocked(this) is true, throw a TypeError
523 // exception.
524 if (IsReadableStreamLocked(this)) {
525 aRv.ThrowTypeError("Cannot pipe from a locked stream.");
526 return nullptr;
529 // Step 2: If ! IsWritableStreamLocked(transform["writable"]) is true, throw a
530 // TypeError exception.
531 if (IsWritableStreamLocked(aTransform.mWritable)) {
532 aRv.ThrowTypeError("Cannot pipe to a locked stream.");
533 return nullptr;
536 // Step 3: Let signal be options["signal"] if it exists, or undefined
537 // otherwise.
538 RefPtr<AbortSignal> signal =
539 aOptions.mSignal.WasPassed() ? &aOptions.mSignal.Value() : nullptr;
541 // Step 4: Let promise be ! ReadableStreamPipeTo(this, transform["writable"],
542 // options["preventClose"], options["preventAbort"], options["preventCancel"],
543 // signal).
544 RefPtr<WritableStream> writable = aTransform.mWritable;
545 RefPtr<Promise> promise = ReadableStreamPipeTo(
546 this, writable, aOptions.mPreventClose, aOptions.mPreventAbort,
547 aOptions.mPreventCancel, signal, aRv);
548 if (aRv.Failed()) {
549 return nullptr;
552 // Step 5: Set promise.[[PromiseIsHandled]] to true.
553 MOZ_ALWAYS_TRUE(promise->SetAnyPromiseIsHandled());
555 // Step 6: Return transform["readable"].
556 return do_AddRef(aTransform.mReadable.get());
559 // https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests
560 double ReadableStreamGetNumReadRequests(ReadableStream* aStream) {
561 // Step 1.
562 MOZ_ASSERT(ReadableStreamHasDefaultReader(aStream));
564 // Step 2.
565 return double(aStream->GetDefaultReader()->ReadRequests().length());
568 // https://streams.spec.whatwg.org/#readable-stream-error
569 void ReadableStreamError(JSContext* aCx, ReadableStream* aStream,
570 JS::Handle<JS::Value> aValue, ErrorResult& aRv) {
571 // Step 1.
572 MOZ_ASSERT(aStream->State() == ReadableStream::ReaderState::Readable);
574 // Step 2.
575 aStream->SetState(ReadableStream::ReaderState::Errored);
577 // Step 3.
578 aStream->SetStoredError(aValue);
580 // Step 4.
581 ReadableStreamGenericReader* reader = aStream->GetReader();
583 // Step 5.
584 if (!reader) {
585 return;
588 // Step 6.
589 reader->ClosedPromise()->MaybeReject(aValue);
591 // Step 7.
592 reader->ClosedPromise()->SetSettledPromiseIsHandled();
594 // Step 8.
595 if (reader->IsDefault()) {
596 // Step 8.1. Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader,
597 // e).
598 RefPtr<ReadableStreamDefaultReader> defaultReader = reader->AsDefault();
599 ReadableStreamDefaultReaderErrorReadRequests(aCx, defaultReader, aValue,
600 aRv);
601 if (aRv.Failed()) {
602 return;
604 } else {
605 // Step 9. Otherwise,
606 // Step 9.1. Assert: reader implements ReadableStreamBYOBReader.
607 MOZ_ASSERT(reader->IsBYOB());
609 // Step 9.2. Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader,
610 // e).
611 RefPtr<ReadableStreamBYOBReader> byobReader = reader->AsBYOB();
612 ReadableStreamBYOBReaderErrorReadIntoRequests(aCx, byobReader, aValue, aRv);
613 if (aRv.Failed()) {
614 return;
618 // Not in Specification: Allow notifying native underlying sources that a
619 // stream has been errored.
620 if (UnderlyingSourceAlgorithmsBase* algorithms = aStream->GetAlgorithms()) {
621 algorithms->ErrorCallback();
625 // https://streams.spec.whatwg.org/#rs-default-controller-close
626 void ReadableStreamFulfillReadRequest(JSContext* aCx, ReadableStream* aStream,
627 JS::Handle<JS::Value> aChunk, bool aDone,
628 ErrorResult& aRv) {
629 // Step 1.
630 MOZ_ASSERT(ReadableStreamHasDefaultReader(aStream));
632 // Step 2.
633 ReadableStreamDefaultReader* reader = aStream->GetDefaultReader();
635 // Step 3.
636 MOZ_ASSERT(!reader->ReadRequests().isEmpty());
638 // Step 4+5.
639 RefPtr<ReadRequest> readRequest = reader->ReadRequests().popFirst();
641 // Step 6.
642 if (aDone) {
643 readRequest->CloseSteps(aCx, aRv);
644 if (aRv.Failed()) {
645 return;
649 // Step 7.
650 readRequest->ChunkSteps(aCx, aChunk, aRv);
653 // https://streams.spec.whatwg.org/#readable-stream-add-read-request
654 void ReadableStreamAddReadRequest(ReadableStream* aStream,
655 ReadRequest* aReadRequest) {
656 // Step 1.
657 MOZ_ASSERT(aStream->GetReader()->IsDefault());
658 // Step 2.
659 MOZ_ASSERT(aStream->State() == ReadableStream::ReaderState::Readable);
660 // Step 3.
661 aStream->GetDefaultReader()->ReadRequests().insertBack(aReadRequest);
664 // https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee
665 // Step 14, 15
666 MOZ_CAN_RUN_SCRIPT already_AddRefed<Promise>
667 ReadableStreamDefaultTeeSourceAlgorithms::CancelCallback(
668 JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
669 ErrorResult& aRv) {
670 // Step 1.
671 mTeeState->SetCanceled(mBranch, true);
673 // Step 2.
674 mTeeState->SetReason(mBranch, aReason.Value());
676 // Step 3.
678 if (mTeeState->Canceled(OtherTeeBranch(mBranch))) {
679 // Step 3.1
681 JS::RootedObject compositeReason(aCx, JS::NewArrayObject(aCx, 2));
682 if (!compositeReason) {
683 aRv.StealExceptionFromJSContext(aCx);
684 return nullptr;
687 JS::RootedValue reason1(aCx, mTeeState->Reason1());
688 if (!JS_SetElement(aCx, compositeReason, 0, reason1)) {
689 aRv.StealExceptionFromJSContext(aCx);
690 return nullptr;
693 JS::RootedValue reason2(aCx, mTeeState->Reason2());
694 if (!JS_SetElement(aCx, compositeReason, 1, reason2)) {
695 aRv.StealExceptionFromJSContext(aCx);
696 return nullptr;
699 // Step 3.2
700 JS::RootedValue compositeReasonValue(aCx,
701 JS::ObjectValue(*compositeReason));
702 RefPtr<ReadableStream> stream(mTeeState->GetStream());
703 RefPtr<Promise> cancelResult =
704 ReadableStreamCancel(aCx, stream, compositeReasonValue, aRv);
705 if (aRv.Failed()) {
706 return nullptr;
709 // Step 3.3
710 mTeeState->CancelPromise()->MaybeResolve(cancelResult);
713 // Step 4.
714 return do_AddRef(mTeeState->CancelPromise());
717 // https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee
718 MOZ_CAN_RUN_SCRIPT
719 static void ReadableStreamDefaultTee(JSContext* aCx, ReadableStream* aStream,
720 bool aCloneForBranch2,
721 nsTArray<RefPtr<ReadableStream>>& aResult,
722 ErrorResult& aRv) {
723 // Step 1. Implicit.
724 // Step 2. Implicit.
726 // Steps 3-12 are contained in the construction of Tee State.
727 RefPtr<TeeState> teeState = TeeState::Create(aStream, aCloneForBranch2, aRv);
728 if (aRv.Failed()) {
729 return;
732 // Step 13 - 16
733 auto branch1Algorithms = MakeRefPtr<ReadableStreamDefaultTeeSourceAlgorithms>(
734 teeState, TeeBranch::Branch1);
735 auto branch2Algorithms = MakeRefPtr<ReadableStreamDefaultTeeSourceAlgorithms>(
736 teeState, TeeBranch::Branch2);
738 // Step 17.
739 nsCOMPtr<nsIGlobalObject> global(
740 do_AddRef(teeState->GetStream()->GetParentObject()));
741 teeState->SetBranch1(CreateReadableStream(aCx, global, branch1Algorithms,
742 mozilla::Nothing(), nullptr, aRv));
743 if (aRv.Failed()) {
744 return;
747 // Step 18.
748 teeState->SetBranch2(CreateReadableStream(aCx, global, branch2Algorithms,
749 mozilla::Nothing(), nullptr, aRv));
750 if (aRv.Failed()) {
751 return;
754 // Step 19.
755 teeState->GetReader()->ClosedPromise()->AddCallbacksWithCycleCollectedArgs(
756 [](JSContext* aCx, JS::Handle<JS::Value> aValue, ErrorResult& aRv,
757 TeeState* aTeeState) {},
758 [](JSContext* aCx, JS::Handle<JS::Value> aReason, ErrorResult& aRv,
759 TeeState* aTeeState) {
760 // Step 19.1.
761 ReadableStreamDefaultControllerError(
762 aCx, aTeeState->Branch1()->DefaultController(), aReason, aRv);
763 if (aRv.Failed()) {
764 return;
767 // Step 19.2
768 ReadableStreamDefaultControllerError(
769 aCx, aTeeState->Branch2()->DefaultController(), aReason, aRv);
770 if (aRv.Failed()) {
771 return;
774 // Step 19.3
775 if (!aTeeState->Canceled1() || !aTeeState->Canceled2()) {
776 aTeeState->CancelPromise()->MaybeResolveWithUndefined();
779 RefPtr(teeState));
781 // Step 20.
782 aResult.AppendElement(teeState->Branch1());
783 aResult.AppendElement(teeState->Branch2());
786 // https://streams.spec.whatwg.org/#rs-pipe-to
787 already_AddRefed<Promise> ReadableStream::PipeTo(
788 WritableStream& aDestination, const StreamPipeOptions& aOptions,
789 ErrorResult& aRv) {
790 // Step 1. If !IsReadableStreamLocked(this) is true, return a promise rejected
791 // with a TypeError exception.
792 if (IsReadableStreamLocked(this)) {
793 aRv.ThrowTypeError("Cannot pipe from a locked stream.");
794 return nullptr;
797 // Step 2. If !IsWritableStreamLocked(destination) is true, return a promise
798 // rejected with a TypeError exception.
799 if (IsWritableStreamLocked(&aDestination)) {
800 aRv.ThrowTypeError("Cannot pipe to a locked stream.");
801 return nullptr;
804 // Step 3. Let signal be options["signal"] if it exists, or undefined
805 // otherwise.
806 RefPtr<AbortSignal> signal =
807 aOptions.mSignal.WasPassed() ? &aOptions.mSignal.Value() : nullptr;
809 // Step 4. Return ! ReadableStreamPipeTo(this, destination,
810 // options["preventClose"], options["preventAbort"], options["preventCancel"],
811 // signal).
812 return ReadableStreamPipeTo(this, &aDestination, aOptions.mPreventClose,
813 aOptions.mPreventAbort, aOptions.mPreventCancel,
814 signal, aRv);
817 // https://streams.spec.whatwg.org/#readable-stream-tee
818 MOZ_CAN_RUN_SCRIPT
819 static void ReadableStreamTee(JSContext* aCx, ReadableStream* aStream,
820 bool aCloneForBranch2,
821 nsTArray<RefPtr<ReadableStream>>& aResult,
822 ErrorResult& aRv) {
823 // Step 1. Implicit.
824 // Step 2. Implicit.
825 // Step 3.
826 if (aStream->Controller()->IsByte()) {
827 ReadableByteStreamTee(aCx, aStream, aResult, aRv);
828 return;
830 // Step 4.
831 ReadableStreamDefaultTee(aCx, aStream, aCloneForBranch2, aResult, aRv);
834 void ReadableStream::Tee(JSContext* aCx,
835 nsTArray<RefPtr<ReadableStream>>& aResult,
836 ErrorResult& aRv) {
837 ReadableStreamTee(aCx, this, false, aResult, aRv);
840 // https://streams.spec.whatwg.org/#readable-stream-add-read-into-request
841 void ReadableStreamAddReadIntoRequest(ReadableStream* aStream,
842 ReadIntoRequest* aReadIntoRequest) {
843 // Step 1. Assert: stream.[[reader]] implements ReadableStreamBYOBReader.
844 MOZ_ASSERT(aStream->GetReader()->IsBYOB());
846 // Step 2. Assert: stream.[[state]] is "readable" or "closed".
847 MOZ_ASSERT(aStream->State() == ReadableStream::ReaderState::Readable ||
848 aStream->State() == ReadableStream::ReaderState::Closed);
850 // Step 3. Append readRequest to stream.[[reader]].[[readIntoRequests]].
851 aStream->GetReader()->AsBYOB()->ReadIntoRequests().insertBack(
852 aReadIntoRequest);
855 // https://streams.spec.whatwg.org/#abstract-opdef-createreadablebytestream
856 already_AddRefed<ReadableStream> CreateReadableByteStream(
857 JSContext* aCx, nsIGlobalObject* aGlobal,
858 UnderlyingSourceAlgorithmsBase* aAlgorithms, ErrorResult& aRv) {
859 // Step 1. Let stream be a new ReadableStream.
860 RefPtr<ReadableStream> stream = new ReadableStream(aGlobal);
862 // Step 2. Perform ! InitializeReadableStream(stream).
863 InitializeReadableStream(stream);
865 // Step 3. Let controller be a new ReadableByteStreamController.
866 RefPtr<ReadableByteStreamController> controller =
867 new ReadableByteStreamController(aGlobal);
869 // Step 4. Perform ? SetUpReadableByteStreamController(stream, controller,
870 // startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, undefined).
871 SetUpReadableByteStreamController(aCx, stream, controller, aAlgorithms, 0,
872 mozilla::Nothing(), aRv);
873 if (aRv.Failed()) {
874 return nullptr;
877 // Return stream.
878 return stream.forget();
881 already_AddRefed<ReadableStream> ReadableStream::Create(
882 JSContext* aCx, nsIGlobalObject* aGlobal,
883 BodyStreamHolder* aUnderlyingSource, ErrorResult& aRv) {
884 RefPtr<ReadableStream> stream = new ReadableStream(aGlobal);
886 stream->SetNativeUnderlyingSource(aUnderlyingSource);
888 SetUpReadableByteStreamControllerFromBodyStreamUnderlyingSource(
889 aCx, stream, aUnderlyingSource, aRv);
891 if (aRv.Failed()) {
892 return nullptr;
895 // Step 5. Return stream.
896 return stream.forget();
899 } // namespace mozilla::dom