1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim:set ts=2 sw=2 sts=2 et cindent: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/dom/AbortFollower.h"
8 #include "mozilla/dom/AbortSignal.h"
9 #include "mozilla/dom/ReadableStream.h"
10 #include "mozilla/dom/ReadableStreamDefaultReader.h"
11 #include "mozilla/dom/ReadableStreamPipeTo.h"
12 #include "mozilla/dom/WritableStream.h"
13 #include "mozilla/dom/WritableStreamDefaultWriter.h"
14 #include "mozilla/dom/Promise.h"
15 #include "mozilla/dom/PromiseNativeHandler.h"
16 #include "mozilla/AlreadyAddRefed.h"
17 #include "mozilla/ErrorResult.h"
18 #include "nsCycleCollectionParticipant.h"
19 #include "nsISupportsImpl.h"
21 #include "js/Exception.h"
23 namespace mozilla::dom
{
25 struct PipeToReadRequest
;
26 class WriteFinishedPromiseHandler
;
27 class ShutdownActionFinishedPromiseHandler
;
30 using ::ImplCycleCollectionUnlink
;
32 // https://streams.spec.whatwg.org/#readable-stream-pipe-to (Steps 14-15.)
34 // This class implements everything that is required to read all chunks from
35 // the reader (source) and write them to writer (destination), while
36 // following the constraints given in the spec using our implementation-defined
39 // The cycle-collected references look roughly like this:
42 // Closed promise <-- ReadableStreamDefaultReader <--> ReadableStream
44 // |(PromiseHandler) |(mReader) |(ReadRequest)
46 // |-------------> PipeToPump <-------
48 // |---------------| | |
49 // | | |-------(mLastWrite) -------->
50 // |(PromiseHandler) | |< ---- (PromiseHandler) ---- Promise
52 // | |(mWriter) |(mWriteRequests)
54 // Closed promise <-- WritableStreamDefaultWriter <--------> WritableStream
57 class PipeToPump final
: public AbortFollower
{
58 NS_DECL_CYCLE_COLLECTING_ISUPPORTS
59 NS_DECL_CYCLE_COLLECTION_CLASS(PipeToPump
)
61 friend struct PipeToReadRequest
;
62 friend class WriteFinishedPromiseHandler
;
63 friend class ShutdownActionFinishedPromiseHandler
;
65 PipeToPump(Promise
* aPromise
, ReadableStreamDefaultReader
* aReader
,
66 WritableStreamDefaultWriter
* aWriter
, bool aPreventClose
,
67 bool aPreventAbort
, bool aPreventCancel
)
71 mPreventClose(aPreventClose
),
72 mPreventAbort(aPreventAbort
),
73 mPreventCancel(aPreventCancel
) {}
75 MOZ_CAN_RUN_SCRIPT
void Start(JSContext
* aCx
, AbortSignal
* aSignal
);
77 MOZ_CAN_RUN_SCRIPT_BOUNDARY
void RunAbortAlgorithm() override
;
80 ~PipeToPump() override
= default;
82 MOZ_CAN_RUN_SCRIPT
void PerformAbortAlgorithm(JSContext
* aCx
,
83 AbortSignalImpl
* aSignal
);
85 MOZ_CAN_RUN_SCRIPT
bool SourceOrDestErroredOrClosed(JSContext
* aCx
);
87 using ShutdownAction
= already_AddRefed
<Promise
> (*)(
88 JSContext
*, PipeToPump
*, JS::Handle
<mozilla::Maybe
<JS::Value
>>,
91 MOZ_CAN_RUN_SCRIPT
void ShutdownWithAction(
92 JSContext
* aCx
, ShutdownAction aAction
,
93 JS::Handle
<mozilla::Maybe
<JS::Value
>> aError
);
94 MOZ_CAN_RUN_SCRIPT
void ShutdownWithActionAfterFinishedWrite(
95 JSContext
* aCx
, ShutdownAction aAction
,
96 JS::Handle
<mozilla::Maybe
<JS::Value
>> aError
);
98 MOZ_CAN_RUN_SCRIPT
void Shutdown(
99 JSContext
* aCx
, JS::Handle
<mozilla::Maybe
<JS::Value
>> aError
);
101 void Finalize(JSContext
* aCx
, JS::Handle
<mozilla::Maybe
<JS::Value
>> aError
);
103 MOZ_CAN_RUN_SCRIPT
void OnReadFulfilled(JSContext
* aCx
,
104 JS::Handle
<JS::Value
> aChunk
,
106 MOZ_CAN_RUN_SCRIPT
void OnWriterReady(JSContext
* aCx
, JS::Handle
<JS::Value
>);
107 MOZ_CAN_RUN_SCRIPT
void Read(JSContext
* aCx
);
109 MOZ_CAN_RUN_SCRIPT
void OnSourceClosed(JSContext
* aCx
, JS::Handle
<JS::Value
>);
110 MOZ_CAN_RUN_SCRIPT
void OnSourceErrored(JSContext
* aCx
,
111 JS::Handle
<JS::Value
> aError
);
113 MOZ_CAN_RUN_SCRIPT
void OnDestClosed(JSContext
* aCx
, JS::Handle
<JS::Value
>);
114 MOZ_CAN_RUN_SCRIPT
void OnDestErrored(JSContext
* aCx
,
115 JS::Handle
<JS::Value
> aError
);
117 RefPtr
<Promise
> mPromise
;
118 RefPtr
<ReadableStreamDefaultReader
> mReader
;
119 RefPtr
<WritableStreamDefaultWriter
> mWriter
;
120 RefPtr
<Promise
> mLastWritePromise
;
121 const bool mPreventClose
;
122 const bool mPreventAbort
;
123 const bool mPreventCancel
;
124 bool mShuttingDown
= false;
126 bool mReadChunk
= false;
130 // This is a helper class for PipeToPump that allows it to attach
131 // member functions as promise handlers.
132 class PipeToPumpHandler final
: public PromiseNativeHandler
{
133 virtual ~PipeToPumpHandler() = default;
135 using FunPtr
= void (PipeToPump::*)(JSContext
*, JS::Handle
<JS::Value
>);
137 RefPtr
<PipeToPump
> mPipeToPump
;
142 NS_DECL_CYCLE_COLLECTING_ISUPPORTS
143 NS_DECL_CYCLE_COLLECTION_CLASS(PipeToPumpHandler
)
145 explicit PipeToPumpHandler(PipeToPump
* aPipeToPump
, FunPtr aResolved
,
147 : mPipeToPump(aPipeToPump
), mResolved(aResolved
), mRejected(aRejected
) {}
149 void ResolvedCallback(JSContext
* aCx
, JS::Handle
<JS::Value
> aValue
,
150 ErrorResult
&) override
{
152 (mPipeToPump
->*mResolved
)(aCx
, aValue
);
156 void RejectedCallback(JSContext
* aCx
, JS::Handle
<JS::Value
> aReason
,
157 ErrorResult
&) override
{
159 (mPipeToPump
->*mRejected
)(aCx
, aReason
);
164 NS_IMPL_CYCLE_COLLECTION(PipeToPumpHandler
, mPipeToPump
)
165 NS_IMPL_CYCLE_COLLECTING_ADDREF(PipeToPumpHandler
)
166 NS_IMPL_CYCLE_COLLECTING_RELEASE(PipeToPumpHandler
)
167 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PipeToPumpHandler
)
168 NS_INTERFACE_MAP_ENTRY(nsISupports
)
171 void PipeToPump::RunAbortAlgorithm() {
173 if (!jsapi
.Init(mReader
->GetStream()->GetParentObject())) {
175 "Failed to initialize AutoJSAPI in PipeToPump::RunAbortAlgorithm");
178 JSContext
* cx
= jsapi
.cx();
180 RefPtr
<AbortSignalImpl
> signal
= Signal();
181 PerformAbortAlgorithm(cx
, signal
);
184 void PipeToPump::PerformAbortAlgorithm(JSContext
* aCx
,
185 AbortSignalImpl
* aSignal
) {
186 MOZ_ASSERT(aSignal
->Aborted());
188 // https://streams.spec.whatwg.org/#readable-stream-pipe-to
189 // Step 14.1. Let abortAlgorithm be the following steps:
190 // Note: All the following steps are 14.1.xx
192 // Step 1. Let error be signal’s abort reason.
193 JS::Rooted
<JS::Value
> error(aCx
);
194 aSignal
->GetReason(aCx
, &error
);
196 auto action
= [](JSContext
* aCx
, PipeToPump
* aPipeToPump
,
197 JS::Handle
<mozilla::Maybe
<JS::Value
>> aError
,
198 ErrorResult
& aRv
) MOZ_CAN_RUN_SCRIPT
{
199 JS::Rooted
<JS::Value
> error(aCx
, *aError
);
201 // Step 2. Let actions be an empty ordered set.
202 nsTArray
<RefPtr
<Promise
>> actions
;
204 // Step 3. If preventAbort is false, append the following action to actions:
205 if (!aPipeToPump
->mPreventAbort
) {
206 RefPtr
<WritableStream
> dest
= aPipeToPump
->mWriter
->GetStream();
208 // Step 3.1. If dest.[[state]] is "writable", return !
209 // WritableStreamAbort(dest, error).
210 if (dest
->State() == WritableStream::WriterState::Writable
) {
211 RefPtr
<Promise
> p
= WritableStreamAbort(aCx
, dest
, error
, aRv
);
213 return already_AddRefed
<Promise
>();
215 actions
.AppendElement(p
);
218 // Step 3.2. Otherwise, return a promise resolved with undefined.
219 // Note: This is basically a no-op.
222 // Step 4. If preventCancel is false, append the following action action to
224 if (!aPipeToPump
->mPreventCancel
) {
225 RefPtr
<ReadableStream
> source
= aPipeToPump
->mReader
->GetStream();
227 // Step 4.1. If source.[[state]] is "readable", return !
228 // ReadableStreamCancel(source, error).
229 if (source
->State() == ReadableStream::ReaderState::Readable
) {
230 RefPtr
<Promise
> p
= ReadableStreamCancel(aCx
, source
, error
, aRv
);
232 return already_AddRefed
<Promise
>();
234 actions
.AppendElement(p
);
237 // Step 4.2. Otherwise, return a promise resolved with undefined.
241 // Step 5. .. action consisting of getting a promise to wait for
242 // all of the actions in actions ...
243 return Promise::All(aCx
, actions
, aRv
);
246 // Step 5. Shutdown with an action consisting of getting a promise to wait for
247 // all of the actions in actions, and with error.
248 JS::Rooted
<Maybe
<JS::Value
>> someError(aCx
, Some(error
.get()));
249 ShutdownWithAction(aCx
, action
, someError
);
252 bool PipeToPump::SourceOrDestErroredOrClosed(JSContext
* aCx
) {
253 // (Constraint) Error and close states must be propagated:
254 // the following conditions must be applied in order.
255 RefPtr
<ReadableStream
> source
= mReader
->GetStream();
256 RefPtr
<WritableStream
> dest
= mWriter
->GetStream();
258 // Step 1. Errors must be propagated forward: if source.[[state]] is or
259 // becomes "errored", then
260 if (source
->State() == ReadableStream::ReaderState::Errored
) {
261 JS::Rooted
<JS::Value
> storedError(aCx
, source
->StoredError());
262 OnSourceErrored(aCx
, storedError
);
266 // Step 2. Errors must be propagated backward: if dest.[[state]] is or becomes
268 if (dest
->State() == WritableStream::WriterState::Errored
) {
269 JS::Rooted
<JS::Value
> storedError(aCx
, dest
->StoredError());
270 OnDestErrored(aCx
, storedError
);
274 // Step 3. Closing must be propagated forward: if source.[[state]] is or
275 // becomes "closed", then
276 if (source
->State() == ReadableStream::ReaderState::Closed
) {
277 OnSourceClosed(aCx
, JS::UndefinedHandleValue
);
281 // Step 4. Closing must be propagated backward:
282 // if ! WritableStreamCloseQueuedOrInFlight(dest) is true
283 // or dest.[[state]] is "closed", then
284 if (dest
->CloseQueuedOrInFlight() ||
285 dest
->State() == WritableStream::WriterState::Closed
) {
286 OnDestClosed(aCx
, JS::UndefinedHandleValue
);
293 // https://streams.spec.whatwg.org/#readable-stream-pipe-to
295 void PipeToPump::Start(JSContext
* aCx
, AbortSignal
* aSignal
) {
296 // Step 14. If signal is not undefined,
298 // Step 14.1. Let abortAlgorithm be the following steps:
299 // ... This is implemented by RunAbortAlgorithm.
301 // Step 14.2. If signal is aborted, perform abortAlgorithm and
303 if (aSignal
->Aborted()) {
304 PerformAbortAlgorithm(aCx
, aSignal
);
308 // Step 14.3. Add abortAlgorithm to signal.
312 // Step 15. In parallel but not really; see #905, using reader and writer,
313 // read all chunks from source and write them to dest.
314 // Due to the locking provided by the reader and writer,
315 // the exact manner in which this happens is not observable to author code,
316 // and so there is flexibility in how this is done.
318 // (Constraint) Error and close states must be propagated
320 // Before piping has started, we have to check for source/destination being
321 // errored/closed manually.
322 if (SourceOrDestErroredOrClosed(aCx
)) {
326 // We use the following two promises to propagate error and close states
328 RefPtr
<Promise
> readerClosed
= mReader
->ClosedPromise();
329 readerClosed
->AppendNativeHandler(new PipeToPumpHandler(
330 this, &PipeToPump::OnSourceClosed
, &PipeToPump::OnSourceErrored
));
332 // Note: Because we control the destination/writer it should never be closed
333 // after we did the initial check above with SourceOrDestErroredOrClosed.
334 RefPtr
<Promise
> writerClosed
= mWriter
->ClosedPromise();
335 writerClosed
->AppendNativeHandler(new PipeToPumpHandler(
336 this, &PipeToPump::OnDestClosed
, &PipeToPump::OnDestErrored
));
341 class WriteFinishedPromiseHandler final
: public PromiseNativeHandler
{
342 RefPtr
<PipeToPump
> mPipeToPump
;
343 PipeToPump::ShutdownAction mAction
;
345 JS::Heap
<JS::Value
> mError
;
347 virtual ~WriteFinishedPromiseHandler() { mozilla::DropJSObjects(this); };
350 NS_DECL_CYCLE_COLLECTING_ISUPPORTS
351 NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(WriteFinishedPromiseHandler
)
353 explicit WriteFinishedPromiseHandler(
354 JSContext
* aCx
, PipeToPump
* aPipeToPump
,
355 PipeToPump::ShutdownAction aAction
,
356 JS::Handle
<mozilla::Maybe
<JS::Value
>> aError
)
357 : mPipeToPump(aPipeToPump
), mAction(aAction
) {
358 mHasError
= aError
.isSome();
362 mozilla::HoldJSObjects(this);
365 MOZ_CAN_RUN_SCRIPT
void WriteFinished(JSContext
* aCx
) {
366 RefPtr
<PipeToPump
> pipeToPump
= mPipeToPump
; // XXX known-live?
367 JS::Rooted
<Maybe
<JS::Value
>> error(aCx
);
369 error
= Some(mError
);
371 pipeToPump
->ShutdownWithActionAfterFinishedWrite(aCx
, mAction
, error
);
374 MOZ_CAN_RUN_SCRIPT
void ResolvedCallback(JSContext
* aCx
,
375 JS::Handle
<JS::Value
> aValue
,
376 ErrorResult
&) override
{
380 MOZ_CAN_RUN_SCRIPT
void RejectedCallback(JSContext
* aCx
,
381 JS::Handle
<JS::Value
> aReason
,
382 ErrorResult
&) override
{
387 NS_IMPL_CYCLE_COLLECTION_WITH_JS_MEMBERS(WriteFinishedPromiseHandler
,
388 (mPipeToPump
), (mError
))
389 NS_IMPL_CYCLE_COLLECTING_ADDREF(WriteFinishedPromiseHandler
)
390 NS_IMPL_CYCLE_COLLECTING_RELEASE(WriteFinishedPromiseHandler
)
391 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(WriteFinishedPromiseHandler
)
392 NS_INTERFACE_MAP_ENTRY(nsISupports
)
395 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action
396 // Shutdown with an action: if any of the above requirements ask to shutdown
397 // with an action action, optionally with an error originalError, then:
398 void PipeToPump::ShutdownWithAction(
399 JSContext
* aCx
, ShutdownAction aAction
,
400 JS::Handle
<mozilla::Maybe
<JS::Value
>> aError
) {
401 // Step 1. If shuttingDown is true, abort these substeps.
406 // Step 2. Set shuttingDown to true.
407 mShuttingDown
= true;
409 // Step 3. If dest.[[state]] is "writable" and !
410 // WritableStreamCloseQueuedOrInFlight(dest) is false,
411 RefPtr
<WritableStream
> dest
= mWriter
->GetStream();
412 if (dest
->State() == WritableStream::WriterState::Writable
&&
413 !dest
->CloseQueuedOrInFlight()) {
414 // Step 3.1. If any chunks have been read but not yet written, write them to
416 // Step 3.2. Wait until every chunk that has been read has been
417 // written (i.e. the corresponding promises have settled).
419 // Note: Write requests are processed in order, so when the promise
420 // for the last written chunk is settled all previous chunks have been
422 if (mLastWritePromise
) {
423 mLastWritePromise
->AppendNativeHandler(
424 new WriteFinishedPromiseHandler(aCx
, this, aAction
, aError
));
429 // Don't have to wait for last write, immediately continue.
430 ShutdownWithActionAfterFinishedWrite(aCx
, aAction
, aError
);
433 class ShutdownActionFinishedPromiseHandler final
: public PromiseNativeHandler
{
434 RefPtr
<PipeToPump
> mPipeToPump
;
436 JS::Heap
<JS::Value
> mError
;
438 virtual ~ShutdownActionFinishedPromiseHandler() {
439 mozilla::DropJSObjects(this);
443 NS_DECL_CYCLE_COLLECTING_ISUPPORTS
444 NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(
445 ShutdownActionFinishedPromiseHandler
)
447 explicit ShutdownActionFinishedPromiseHandler(
448 JSContext
* aCx
, PipeToPump
* aPipeToPump
,
449 JS::Handle
<mozilla::Maybe
<JS::Value
>> aError
)
450 : mPipeToPump(aPipeToPump
) {
451 mHasError
= aError
.isSome();
455 mozilla::HoldJSObjects(this);
458 void ResolvedCallback(JSContext
* aCx
, JS::Handle
<JS::Value
> aValue
,
459 ErrorResult
&) override
{
460 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action
461 // Step 5. Upon fulfillment of p, finalize, passing along originalError if
463 JS::Rooted
<Maybe
<JS::Value
>> error(aCx
);
465 error
= Some(mError
);
467 mPipeToPump
->Finalize(aCx
, error
);
470 void RejectedCallback(JSContext
* aCx
, JS::Handle
<JS::Value
> aReason
,
471 ErrorResult
&) override
{
472 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action
473 // Step 6. Upon rejection of p with reason newError, finalize with
475 JS::Rooted
<Maybe
<JS::Value
>> error(aCx
, Some(aReason
));
476 mPipeToPump
->Finalize(aCx
, error
);
480 NS_IMPL_CYCLE_COLLECTION_WITH_JS_MEMBERS(ShutdownActionFinishedPromiseHandler
,
481 (mPipeToPump
), (mError
))
482 NS_IMPL_CYCLE_COLLECTING_ADDREF(ShutdownActionFinishedPromiseHandler
)
483 NS_IMPL_CYCLE_COLLECTING_RELEASE(ShutdownActionFinishedPromiseHandler
)
484 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ShutdownActionFinishedPromiseHandler
)
485 NS_INTERFACE_MAP_ENTRY(nsISupports
)
488 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action
489 // Continuation after Step 3. triggered a promise resolution.
490 void PipeToPump::ShutdownWithActionAfterFinishedWrite(
491 JSContext
* aCx
, ShutdownAction aAction
,
492 JS::Handle
<mozilla::Maybe
<JS::Value
>> aError
) {
494 // Used to implement shutdown without action. Finalize immediately.
495 Finalize(aCx
, aError
);
499 // Step 4. Let p be the result of performing action.
500 RefPtr
<PipeToPump
> thisRefPtr
= this;
502 RefPtr
<Promise
> p
= aAction(aCx
, thisRefPtr
, aError
, rv
);
504 // Error while calling actions above, continue immediately with finalization.
505 if (rv
.MaybeSetPendingException(aCx
)) {
506 JS::Rooted
<Maybe
<JS::Value
>> someError(aCx
);
508 JS::Rooted
<JS::Value
> error(aCx
);
509 if (JS_GetPendingException(aCx
, &error
)) {
510 someError
= Some(error
.get());
513 JS_ClearPendingException(aCx
);
515 Finalize(aCx
, someError
);
520 p
->AppendNativeHandler(
521 new ShutdownActionFinishedPromiseHandler(aCx
, this, aError
));
524 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown
525 // Shutdown: if any of the above requirements or steps ask to shutdown,
526 // optionally with an error error, then:
527 void PipeToPump::Shutdown(JSContext
* aCx
,
528 JS::Handle
<mozilla::Maybe
<JS::Value
>> aError
) {
529 // Note: We implement "shutdown" in terms of "shutdown with action".
530 // We can observe that when passing along an action that always succeeds
531 // shutdown with action and shutdown have the same behavior, when
532 // Ignoring the potential micro task for the promise that we skip anyway.
533 ShutdownWithAction(aCx
, nullptr, aError
);
536 // https://streams.spec.whatwg.org/#rs-pipeTo-finalize
537 // Finalize: both forms of shutdown will eventually ask to finalize,
538 // optionally with an error error, which means to perform the following steps:
539 void PipeToPump::Finalize(JSContext
* aCx
,
540 JS::Handle
<mozilla::Maybe
<JS::Value
>> aError
) {
541 IgnoredErrorResult rv
;
542 // Step 1. Perform ! WritableStreamDefaultWriterRelease(writer).
543 WritableStreamDefaultWriterRelease(aCx
, mWriter
, rv
);
544 NS_WARNING_ASSERTION(!rv
.Failed(),
545 "WritableStreamDefaultWriterRelease should not fail.");
547 // Step 2. If reader implements ReadableStreamBYOBReader,
548 // perform ! ReadableStreamBYOBReaderRelease(reader).
549 // Note: We always use a default reader.
550 MOZ_ASSERT(!mReader
->IsBYOB());
552 // Step 3. Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader).
553 ReadableStreamDefaultReaderRelease(aCx
, mReader
, rv
);
554 NS_WARNING_ASSERTION(!rv
.Failed(),
555 "ReadableStreamReaderGenericRelease should not fail.");
557 // Step 3. If signal is not undefined, remove abortAlgorithm from signal.
562 // Step 4. If error was given, reject promise with error.
563 if (aError
.isSome()) {
564 JS::Rooted
<JS::Value
> error(aCx
, *aError
);
565 mPromise
->MaybeReject(error
);
567 // Step 5. Otherwise, resolve promise with undefined.
568 mPromise
->MaybeResolveWithUndefined();
571 // Remove all references.
575 mLastWritePromise
= nullptr;
579 void PipeToPump::OnReadFulfilled(JSContext
* aCx
, JS::Handle
<JS::Value
> aChunk
,
581 // (Constraint) Shutdown must stop activity:
582 // if shuttingDown becomes true, the user agent must not initiate further
583 // reads from reader, and must only perform writes of already-read chunks ...
585 // We may reach this point after |On{Source,Dest}{Clos,Error}ed| has responded
586 // to an out-of-band change. Per the comment in |OnSourceErrored|, we want to
587 // allow the implicated shutdown to proceed, and we don't want to interfere
588 // with or additionally alter its operation. Particularly, we don't want to
589 // queue up the successfully-read chunk (if there was one, and this isn't just
590 // reporting "done") to be written: it wasn't "already-read" when that
591 // error/closure happened.
593 // All specified reactions to a closure/error invoke either the shutdown, or
594 // shutdown with an action, algorithms. Those algorithms each abort if either
595 // shutdown algorithm has already been invoked. So we check for shutdown here
596 // in case of asynchronous closure/error and abort if shutdown has already
597 // started (and possibly finished).
599 // TODO: Implement the eventual resolution from
600 // https://github.com/whatwg/streams/issues/1207
605 RefPtr
<WritableStreamDefaultWriter
> writer
= mWriter
;
607 WritableStreamDefaultWriterWrite(aCx
, writer
, aChunk
, aRv
);
609 mLastWritePromise
= nullptr;
613 mLastWritePromise
->AppendNativeHandler(
614 new PipeToPumpHandler(this, nullptr, &PipeToPump::OnDestErrored
));
616 // Last read has finished, so it's time to start reading again.
620 void PipeToPump::OnWriterReady(JSContext
* aCx
, JS::Handle
<JS::Value
>) {
621 // Writer is ready again (i.e. backpressure was resolved), so read.
625 struct PipeToReadRequest
: public ReadRequest
{
627 NS_DECL_ISUPPORTS_INHERITED
628 NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(PipeToReadRequest
, ReadRequest
)
630 RefPtr
<PipeToPump
> mPipeToPump
;
632 explicit PipeToReadRequest(PipeToPump
* aPipeToPump
)
633 : mPipeToPump(aPipeToPump
) {}
635 MOZ_CAN_RUN_SCRIPT_BOUNDARY
void ChunkSteps(JSContext
* aCx
,
636 JS::Handle
<JS::Value
> aChunk
,
637 ErrorResult
& aRv
) override
{
638 RefPtr
<PipeToPump
> pipeToPump
= mPipeToPump
; // XXX known live?
639 pipeToPump
->OnReadFulfilled(aCx
, aChunk
, aRv
);
642 // The reader's closed promise handlers will already call OnSourceClosed/
643 // OnSourceErrored, so these steps can just be ignored.
644 void CloseSteps(JSContext
* aCx
, ErrorResult
& aRv
) override
{}
645 void ErrorSteps(JSContext
* aCx
, JS::Handle
<JS::Value
> aError
,
646 ErrorResult
& aRv
) override
{}
649 virtual ~PipeToReadRequest() = default;
652 NS_IMPL_CYCLE_COLLECTION_CLASS(PipeToReadRequest
)
654 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN_INHERITED(PipeToReadRequest
, ReadRequest
)
655 NS_IMPL_CYCLE_COLLECTION_UNLINK(mPipeToPump
)
656 NS_IMPL_CYCLE_COLLECTION_UNLINK_END
658 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN_INHERITED(PipeToReadRequest
,
660 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mPipeToPump
)
661 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
663 NS_IMPL_ADDREF_INHERITED(PipeToReadRequest
, ReadRequest
)
664 NS_IMPL_RELEASE_INHERITED(PipeToReadRequest
, ReadRequest
)
666 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PipeToReadRequest
)
667 NS_INTERFACE_MAP_END_INHERITING(ReadRequest
)
669 void PipeToPump::Read(JSContext
* aCx
) {
674 // (Constraint) Shutdown must stop activity:
675 // If shuttingDown becomes true, the user agent must not initiate
676 // further reads from reader
681 // (Constraint) Backpressure must be enforced:
682 // While WritableStreamDefaultWriterGetDesiredSize(writer) is ≤ 0 or is null,
683 // the user agent must not read from reader.
684 Nullable
<double> desiredSize
=
685 WritableStreamDefaultWriterGetDesiredSize(mWriter
);
686 if (desiredSize
.IsNull()) {
687 // This means the writer has errored. This is going to be handled
688 // by the writer closed promise.
692 if (desiredSize
.Value() <= 0) {
693 // Wait for the writer to become ready before reading more data from
694 // the reader. We don't care about rejections here, because those are
695 // already handled by the writer closed promise.
696 RefPtr
<Promise
> readyPromise
= mWriter
->Ready();
697 readyPromise
->AppendNativeHandler(
698 new PipeToPumpHandler(this, &PipeToPump::OnWriterReady
, nullptr));
702 RefPtr
<ReadableStreamDefaultReader
> reader
= mReader
;
703 RefPtr
<ReadRequest
> request
= new PipeToReadRequest(this);
705 ReadableStreamDefaultReaderRead(aCx
, reader
, request
, rv
);
706 if (rv
.MaybeSetPendingException(aCx
)) {
707 // XXX It's actually not quite obvious what we should do here.
708 // We've got an error during reading, so on the surface it seems logical
709 // to invoke `OnSourceErrored`. However in certain cases the required
710 // condition > source.[[state]] is or becomes "errored" < won't actually
711 // happen i.e. when `WritableStreamDefaultWriterWrite` called from
712 // `OnReadFulfilled` (via PipeToReadRequest::ChunkSteps) fails in
713 // a synchronous fashion.
714 JS::Rooted
<JS::Value
> error(aCx
);
715 JS::Rooted
<Maybe
<JS::Value
>> someError(aCx
);
717 // The error was moved to the JSContext by MaybeSetPendingException.
718 if (JS_GetPendingException(aCx
, &error
)) {
719 someError
= Some(error
.get());
722 JS_ClearPendingException(aCx
);
724 Shutdown(aCx
, someError
);
728 // Step 3. Closing must be propagated forward: if source.[[state]] is or
729 // becomes "closed", then
730 void PipeToPump::OnSourceClosed(JSContext
* aCx
, JS::Handle
<JS::Value
>) {
731 // Step 3.1. If preventClose is false, shutdown with an action of
732 // ! WritableStreamDefaultWriterCloseWithErrorPropagation(writer).
733 if (!mPreventClose
) {
736 [](JSContext
* aCx
, PipeToPump
* aPipeToPump
,
737 JS::Handle
<mozilla::Maybe
<JS::Value
>> aError
, ErrorResult
& aRv
)
739 RefPtr
<WritableStreamDefaultWriter
> writer
= aPipeToPump
->mWriter
;
740 return WritableStreamDefaultWriterCloseWithErrorPropagation(
743 JS::NothingHandleValue
);
745 // Step 3.2 Otherwise, shutdown.
746 Shutdown(aCx
, JS::NothingHandleValue
);
750 // Step 1. Errors must be propagated forward: if source.[[state]] is or
751 // becomes "errored", then
752 void PipeToPump::OnSourceErrored(JSContext
* aCx
,
753 JS::Handle
<JS::Value
> aSourceStoredError
) {
754 // If |source| becomes errored not during a pending read, it's clear we must
755 // react immediately.
757 // But what if |source| becomes errored *during* a pending read? Should this
758 // first error, or the pending-read second error, predominate? Two semantics
759 // are possible when |source|/|dest| become closed or errored while there's a
762 // 1. Wait until the read fulfills or rejects, then respond to the
763 // closure/error without regard to the read having fulfilled or rejected.
764 // (This will simply not react to the read being rejected, or it will
765 // queue up the read chunk to be written during shutdown.)
766 // 2. React to the closure/error immediately per "Error and close states
767 // must be propagated". Then when the read fulfills or rejects later, do
770 // The spec doesn't clearly require either semantics. It requires that
771 // *already-read* chunks be written (at least if |dest| didn't become errored
772 // or closed such that no further writes can occur). But it's silent as to
773 // not-fully-read chunks. (These semantic differences may only be observable
774 // with very carefully constructed readable/writable streams.)
776 // It seems best, generally, to react to the temporally-earliest problem that
777 // arises, so we implement option #2. (Blink, in contrast, currently
778 // implements option #1.)
780 // All specified reactions to a closure/error invoke either the shutdown, or
781 // shutdown with an action, algorithms. Those algorithms each abort if either
782 // shutdown algorithm has already been invoked. So we don't need to do
783 // anything special here to deal with a pending read.
785 // TODO: Implement the eventual resolution from
786 // https://github.com/whatwg/streams/issues/1207
788 // Step 1.1 If preventAbort is false, shutdown with an action of
789 // ! WritableStreamAbort(dest, source.[[storedError]])
790 // and with source.[[storedError]].
791 JS::Rooted
<Maybe
<JS::Value
>> error(aCx
, Some(aSourceStoredError
));
792 if (!mPreventAbort
) {
795 [](JSContext
* aCx
, PipeToPump
* aPipeToPump
,
796 JS::Handle
<mozilla::Maybe
<JS::Value
>> aError
, ErrorResult
& aRv
)
798 JS::Rooted
<JS::Value
> error(aCx
, *aError
);
799 RefPtr
<WritableStream
> dest
= aPipeToPump
->mWriter
->GetStream();
800 return WritableStreamAbort(aCx
, dest
, error
, aRv
);
804 // Step 1.1. Otherwise, shutdown with source.[[storedError]].
805 Shutdown(aCx
, error
);
809 // Step 4. Closing must be propagated backward:
810 // if ! WritableStreamCloseQueuedOrInFlight(dest) is true
811 // or dest.[[state]] is "closed", then
812 void PipeToPump::OnDestClosed(JSContext
* aCx
, JS::Handle
<JS::Value
>) {
813 // Step 4.1. Assert: no chunks have been read or written.
814 // Note: No reading automatically implies no writing.
815 // In a perfect world OnDestClosed would only be called before we start
816 // piping, because afterwards the writer has an exclusive lock on the stream.
817 // In reality the closed promise can still be resolved after we release
818 // the lock on the writer in Finalize.
822 MOZ_ASSERT(!mReadChunk
);
824 // Step 4.2. Let destClosed be a new TypeError.
825 JS::Rooted
<Maybe
<JS::Value
>> destClosed(aCx
, Nothing());
828 rv
.ThrowTypeError("Cannot pipe to closed stream");
829 JS::Rooted
<JS::Value
> error(aCx
);
830 bool ok
= ToJSValue(aCx
, std::move(rv
), &error
);
831 MOZ_RELEASE_ASSERT(ok
, "must be ok");
832 destClosed
= Some(error
.get());
835 // Step 4.3. If preventCancel is false, shutdown with an action of
836 // ! ReadableStreamCancel(source, destClosed) and with destClosed.
837 if (!mPreventCancel
) {
840 [](JSContext
* aCx
, PipeToPump
* aPipeToPump
,
841 JS::Handle
<mozilla::Maybe
<JS::Value
>> aError
, ErrorResult
& aRv
)
843 JS::Rooted
<JS::Value
> error(aCx
, *aError
);
844 RefPtr
<ReadableStream
> dest
= aPipeToPump
->mReader
->GetStream();
845 return ReadableStreamCancel(aCx
, dest
, error
, aRv
);
849 // Step 4.4. Otherwise, shutdown with destClosed.
850 Shutdown(aCx
, destClosed
);
854 // Step 2. Errors must be propagated backward: if dest.[[state]] is or becomes
856 void PipeToPump::OnDestErrored(JSContext
* aCx
,
857 JS::Handle
<JS::Value
> aDestStoredError
) {
858 // Step 2.1. If preventCancel is false, shutdown with an action of
859 // ! ReadableStreamCancel(source, dest.[[storedError]])
860 // and with dest.[[storedError]].
861 JS::Rooted
<Maybe
<JS::Value
>> error(aCx
, Some(aDestStoredError
));
862 if (!mPreventCancel
) {
865 [](JSContext
* aCx
, PipeToPump
* aPipeToPump
,
866 JS::Handle
<mozilla::Maybe
<JS::Value
>> aError
, ErrorResult
& aRv
)
868 JS::Rooted
<JS::Value
> error(aCx
, *aError
);
869 RefPtr
<ReadableStream
> dest
= aPipeToPump
->mReader
->GetStream();
870 return ReadableStreamCancel(aCx
, dest
, error
, aRv
);
874 // Step 2.1. Otherwise, shutdown with dest.[[storedError]].
875 Shutdown(aCx
, error
);
879 NS_IMPL_CYCLE_COLLECTION_CLASS(PipeToPump
)
880 NS_IMPL_CYCLE_COLLECTING_ADDREF(PipeToPump
)
881 NS_IMPL_CYCLE_COLLECTING_RELEASE(PipeToPump
)
882 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PipeToPump
)
883 NS_INTERFACE_MAP_ENTRY(nsISupports
)
886 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(PipeToPump
)
887 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mPromise
)
888 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReader
)
889 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mWriter
)
890 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mLastWritePromise
)
891 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
893 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(PipeToPump
)
894 NS_IMPL_CYCLE_COLLECTION_UNLINK(mPromise
)
895 NS_IMPL_CYCLE_COLLECTION_UNLINK(mReader
)
896 NS_IMPL_CYCLE_COLLECTION_UNLINK(mWriter
)
897 NS_IMPL_CYCLE_COLLECTION_UNLINK(mLastWritePromise
)
898 NS_IMPL_CYCLE_COLLECTION_UNLINK_END
900 // https://streams.spec.whatwg.org/#readable-stream-pipe-to
901 already_AddRefed
<Promise
> ReadableStreamPipeTo(
902 ReadableStream
* aSource
, WritableStream
* aDest
, bool aPreventClose
,
903 bool aPreventAbort
, bool aPreventCancel
, AbortSignal
* aSignal
,
904 mozilla::ErrorResult
& aRv
) {
905 // Step 1. Assert: source implements ReadableStream. (Implicit)
906 // Step 2. Assert: dest implements WritableStream. (Implicit)
907 // Step 3. Assert: preventClose, preventAbort, and preventCancel are all
908 // booleans (Implicit)
909 // Step 4. If signal was not given, let signal be
910 // undefined. (Implicit)
911 // Step 5. Assert: either signal is undefined, or signal
912 // implements AbortSignal. (Implicit)
913 // Step 6. Assert: !IsReadableStreamLocked(source) is false.
914 MOZ_ASSERT(!IsReadableStreamLocked(aSource
));
916 // Step 7. Assert: !IsWritableStreamLocked(dest) is false.
917 MOZ_ASSERT(!IsWritableStreamLocked(aDest
));
920 if (!jsapi
.Init(aSource
->GetParentObject())) {
921 aRv
.ThrowUnknownError("Internal error");
924 JSContext
* cx
= jsapi
.cx();
926 // Step 8. If source.[[controller]] implements ReadableByteStreamController,
927 // let reader be either !AcquireReadableStreamBYOBReader(source) or
928 // !AcquireReadableStreamDefaultReader(source), at the user agent’s
930 // Step 9. Otherwise, let reader be
931 // !AcquireReadableStreamDefaultReader(source).
933 // Note: In the interests of simplicity, we choose here to always acquire
935 RefPtr
<ReadableStreamDefaultReader
> reader
=
936 AcquireReadableStreamDefaultReader(aSource
, aRv
);
941 // Step 10. Let writer be ! AcquireWritableStreamDefaultWriter(dest).
942 RefPtr
<WritableStreamDefaultWriter
> writer
=
943 AcquireWritableStreamDefaultWriter(aDest
, aRv
);
948 // Step 11. Set source.[[disturbed]] to true.
949 aSource
->SetDisturbed(true);
951 // Step 12. Let shuttingDown be false.
952 // Note: PipeToPump ensures this by construction.
954 // Step 13. Let promise be a new promise.
955 RefPtr
<Promise
> promise
= Promise::Create(aSource
->GetParentObject(), aRv
);
961 RefPtr
<PipeToPump
> pump
= new PipeToPump(
962 promise
, reader
, writer
, aPreventClose
, aPreventAbort
, aPreventCancel
);
963 pump
->Start(cx
, aSignal
);
965 // Step 16. Return promise.
966 return promise
.forget();
969 } // namespace mozilla::dom