Bug 1769952 - Fix running raptor on a Win10-64 VM r=sparky
[gecko.git] / dom / streams / ReadableStreamPipeTo.cpp
blob5aafe1d1e1372af94f9fd9c5046dbc8e3ce8cd10
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim:set ts=2 sw=2 sts=2 et cindent: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/dom/AbortFollower.h"
8 #include "mozilla/dom/AbortSignal.h"
9 #include "mozilla/dom/ReadableStream.h"
10 #include "mozilla/dom/ReadableStreamDefaultReader.h"
11 #include "mozilla/dom/ReadableStreamPipeTo.h"
12 #include "mozilla/dom/WritableStream.h"
13 #include "mozilla/dom/WritableStreamDefaultWriter.h"
14 #include "mozilla/dom/Promise.h"
15 #include "mozilla/dom/PromiseNativeHandler.h"
16 #include "mozilla/AlreadyAddRefed.h"
17 #include "mozilla/ErrorResult.h"
18 #include "nsCycleCollectionParticipant.h"
19 #include "nsISupportsImpl.h"
21 #include "js/Exception.h"
23 namespace mozilla::dom {
25 struct PipeToReadRequest;
26 class WriteFinishedPromiseHandler;
27 class ShutdownActionFinishedPromiseHandler;
29 // TODO: Bug 1756794
30 using ::ImplCycleCollectionUnlink;
32 // https://streams.spec.whatwg.org/#readable-stream-pipe-to (Steps 14-15.)
34 // This class implements everything that is required to read all chunks from
35 // the reader (source) and write them to writer (destination), while
36 // following the constraints given in the spec using our implementation-defined
37 // behavior.
39 // The cycle-collected references look roughly like this:
40 // clang-format off
42 // Closed promise <-- ReadableStreamDefaultReader <--> ReadableStream
43 // | ^ |
44 // |(PromiseHandler) |(mReader) |(ReadRequest)
45 // | | |
46 // |-------------> PipeToPump <-------
47 // ^ | |
48 // |---------------| | |
49 // | | |-------(mLastWrite) -------->
50 // |(PromiseHandler) | |< ---- (PromiseHandler) ---- Promise
51 // | | ^
52 // | |(mWriter) |(mWriteRequests)
53 // | v |
54 // Closed promise <-- WritableStreamDefaultWriter <--------> WritableStream
56 // clang-format on
57 class PipeToPump final : public AbortFollower {
58 NS_DECL_CYCLE_COLLECTING_ISUPPORTS
59 NS_DECL_CYCLE_COLLECTION_CLASS(PipeToPump)
61 friend struct PipeToReadRequest;
62 friend class WriteFinishedPromiseHandler;
63 friend class ShutdownActionFinishedPromiseHandler;
65 PipeToPump(Promise* aPromise, ReadableStreamDefaultReader* aReader,
66 WritableStreamDefaultWriter* aWriter, bool aPreventClose,
67 bool aPreventAbort, bool aPreventCancel)
68 : mPromise(aPromise),
69 mReader(aReader),
70 mWriter(aWriter),
71 mPreventClose(aPreventClose),
72 mPreventAbort(aPreventAbort),
73 mPreventCancel(aPreventCancel) {}
75 MOZ_CAN_RUN_SCRIPT void Start(JSContext* aCx, AbortSignal* aSignal);
77 MOZ_CAN_RUN_SCRIPT_BOUNDARY void RunAbortAlgorithm() override;
79 private:
80 ~PipeToPump() override = default;
82 MOZ_CAN_RUN_SCRIPT void PerformAbortAlgorithm(JSContext* aCx,
83 AbortSignalImpl* aSignal);
85 MOZ_CAN_RUN_SCRIPT bool SourceOrDestErroredOrClosed(JSContext* aCx);
87 using ShutdownAction = already_AddRefed<Promise> (*)(
88 JSContext*, PipeToPump*, JS::Handle<mozilla::Maybe<JS::Value>>,
89 ErrorResult&);
91 MOZ_CAN_RUN_SCRIPT void ShutdownWithAction(
92 JSContext* aCx, ShutdownAction aAction,
93 JS::Handle<mozilla::Maybe<JS::Value>> aError);
94 MOZ_CAN_RUN_SCRIPT void ShutdownWithActionAfterFinishedWrite(
95 JSContext* aCx, ShutdownAction aAction,
96 JS::Handle<mozilla::Maybe<JS::Value>> aError);
98 MOZ_CAN_RUN_SCRIPT void Shutdown(
99 JSContext* aCx, JS::Handle<mozilla::Maybe<JS::Value>> aError);
101 void Finalize(JSContext* aCx, JS::Handle<mozilla::Maybe<JS::Value>> aError);
103 MOZ_CAN_RUN_SCRIPT void OnReadFulfilled(JSContext* aCx,
104 JS::Handle<JS::Value> aChunk,
105 ErrorResult& aRv);
106 MOZ_CAN_RUN_SCRIPT void OnWriterReady(JSContext* aCx, JS::Handle<JS::Value>);
107 MOZ_CAN_RUN_SCRIPT void Read(JSContext* aCx);
109 MOZ_CAN_RUN_SCRIPT void OnSourceClosed(JSContext* aCx, JS::Handle<JS::Value>);
110 MOZ_CAN_RUN_SCRIPT void OnSourceErrored(JSContext* aCx,
111 JS::Handle<JS::Value> aError);
113 MOZ_CAN_RUN_SCRIPT void OnDestClosed(JSContext* aCx, JS::Handle<JS::Value>);
114 MOZ_CAN_RUN_SCRIPT void OnDestErrored(JSContext* aCx,
115 JS::Handle<JS::Value> aError);
117 RefPtr<Promise> mPromise;
118 RefPtr<ReadableStreamDefaultReader> mReader;
119 RefPtr<WritableStreamDefaultWriter> mWriter;
120 RefPtr<Promise> mLastWritePromise;
121 const bool mPreventClose;
122 const bool mPreventAbort;
123 const bool mPreventCancel;
124 bool mShuttingDown = false;
125 #ifdef DEBUG
126 bool mReadChunk = false;
127 #endif
130 // This is a helper class for PipeToPump that allows it to attach
131 // member functions as promise handlers.
132 class PipeToPumpHandler final : public PromiseNativeHandler {
133 virtual ~PipeToPumpHandler() = default;
135 using FunPtr = void (PipeToPump::*)(JSContext*, JS::Handle<JS::Value>);
137 RefPtr<PipeToPump> mPipeToPump;
138 FunPtr mResolved;
139 FunPtr mRejected;
141 public:
142 NS_DECL_CYCLE_COLLECTING_ISUPPORTS
143 NS_DECL_CYCLE_COLLECTION_CLASS(PipeToPumpHandler)
145 explicit PipeToPumpHandler(PipeToPump* aPipeToPump, FunPtr aResolved,
146 FunPtr aRejected)
147 : mPipeToPump(aPipeToPump), mResolved(aResolved), mRejected(aRejected) {}
149 void ResolvedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue,
150 ErrorResult&) override {
151 if (mResolved) {
152 (mPipeToPump->*mResolved)(aCx, aValue);
156 void RejectedCallback(JSContext* aCx, JS::Handle<JS::Value> aReason,
157 ErrorResult&) override {
158 if (mRejected) {
159 (mPipeToPump->*mRejected)(aCx, aReason);
164 NS_IMPL_CYCLE_COLLECTION(PipeToPumpHandler, mPipeToPump)
165 NS_IMPL_CYCLE_COLLECTING_ADDREF(PipeToPumpHandler)
166 NS_IMPL_CYCLE_COLLECTING_RELEASE(PipeToPumpHandler)
167 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PipeToPumpHandler)
168 NS_INTERFACE_MAP_ENTRY(nsISupports)
169 NS_INTERFACE_MAP_END
171 void PipeToPump::RunAbortAlgorithm() {
172 AutoJSAPI jsapi;
173 if (!jsapi.Init(mReader->GetStream()->GetParentObject())) {
174 NS_WARNING(
175 "Failed to initialize AutoJSAPI in PipeToPump::RunAbortAlgorithm");
176 return;
178 JSContext* cx = jsapi.cx();
180 RefPtr<AbortSignalImpl> signal = Signal();
181 PerformAbortAlgorithm(cx, signal);
184 void PipeToPump::PerformAbortAlgorithm(JSContext* aCx,
185 AbortSignalImpl* aSignal) {
186 MOZ_ASSERT(aSignal->Aborted());
188 // https://streams.spec.whatwg.org/#readable-stream-pipe-to
189 // Step 14.1. Let abortAlgorithm be the following steps:
190 // Note: All the following steps are 14.1.xx
192 // Step 1. Let error be signal’s abort reason.
193 JS::Rooted<JS::Value> error(aCx);
194 aSignal->GetReason(aCx, &error);
196 auto action = [](JSContext* aCx, PipeToPump* aPipeToPump,
197 JS::Handle<mozilla::Maybe<JS::Value>> aError,
198 ErrorResult& aRv) MOZ_CAN_RUN_SCRIPT {
199 JS::Rooted<JS::Value> error(aCx, *aError);
201 // Step 2. Let actions be an empty ordered set.
202 nsTArray<RefPtr<Promise>> actions;
204 // Step 3. If preventAbort is false, append the following action to actions:
205 if (!aPipeToPump->mPreventAbort) {
206 RefPtr<WritableStream> dest = aPipeToPump->mWriter->GetStream();
208 // Step 3.1. If dest.[[state]] is "writable", return !
209 // WritableStreamAbort(dest, error).
210 if (dest->State() == WritableStream::WriterState::Writable) {
211 RefPtr<Promise> p = WritableStreamAbort(aCx, dest, error, aRv);
212 if (aRv.Failed()) {
213 return already_AddRefed<Promise>();
215 actions.AppendElement(p);
218 // Step 3.2. Otherwise, return a promise resolved with undefined.
219 // Note: This is basically a no-op.
222 // Step 4. If preventCancel is false, append the following action action to
223 // actions:
224 if (!aPipeToPump->mPreventCancel) {
225 RefPtr<ReadableStream> source = aPipeToPump->mReader->GetStream();
227 // Step 4.1. If source.[[state]] is "readable", return !
228 // ReadableStreamCancel(source, error).
229 if (source->State() == ReadableStream::ReaderState::Readable) {
230 RefPtr<Promise> p = ReadableStreamCancel(aCx, source, error, aRv);
231 if (aRv.Failed()) {
232 return already_AddRefed<Promise>();
234 actions.AppendElement(p);
237 // Step 4.2. Otherwise, return a promise resolved with undefined.
238 // No-op again.
241 // Step 5. .. action consisting of getting a promise to wait for
242 // all of the actions in actions ...
243 return Promise::All(aCx, actions, aRv);
246 // Step 5. Shutdown with an action consisting of getting a promise to wait for
247 // all of the actions in actions, and with error.
248 JS::Rooted<Maybe<JS::Value>> someError(aCx, Some(error.get()));
249 ShutdownWithAction(aCx, action, someError);
252 bool PipeToPump::SourceOrDestErroredOrClosed(JSContext* aCx) {
253 // (Constraint) Error and close states must be propagated:
254 // the following conditions must be applied in order.
255 RefPtr<ReadableStream> source = mReader->GetStream();
256 RefPtr<WritableStream> dest = mWriter->GetStream();
258 // Step 1. Errors must be propagated forward: if source.[[state]] is or
259 // becomes "errored", then
260 if (source->State() == ReadableStream::ReaderState::Errored) {
261 JS::Rooted<JS::Value> storedError(aCx, source->StoredError());
262 OnSourceErrored(aCx, storedError);
263 return true;
266 // Step 2. Errors must be propagated backward: if dest.[[state]] is or becomes
267 // "errored", then
268 if (dest->State() == WritableStream::WriterState::Errored) {
269 JS::Rooted<JS::Value> storedError(aCx, dest->StoredError());
270 OnDestErrored(aCx, storedError);
271 return true;
274 // Step 3. Closing must be propagated forward: if source.[[state]] is or
275 // becomes "closed", then
276 if (source->State() == ReadableStream::ReaderState::Closed) {
277 OnSourceClosed(aCx, JS::UndefinedHandleValue);
278 return true;
281 // Step 4. Closing must be propagated backward:
282 // if ! WritableStreamCloseQueuedOrInFlight(dest) is true
283 // or dest.[[state]] is "closed", then
284 if (dest->CloseQueuedOrInFlight() ||
285 dest->State() == WritableStream::WriterState::Closed) {
286 OnDestClosed(aCx, JS::UndefinedHandleValue);
287 return true;
290 return false;
293 // https://streams.spec.whatwg.org/#readable-stream-pipe-to
294 // Steps 14-15.
295 void PipeToPump::Start(JSContext* aCx, AbortSignal* aSignal) {
296 // Step 14. If signal is not undefined,
297 if (aSignal) {
298 // Step 14.1. Let abortAlgorithm be the following steps:
299 // ... This is implemented by RunAbortAlgorithm.
301 // Step 14.2. If signal is aborted, perform abortAlgorithm and
302 // return promise.
303 if (aSignal->Aborted()) {
304 PerformAbortAlgorithm(aCx, aSignal);
305 return;
308 // Step 14.3. Add abortAlgorithm to signal.
309 Follow(aSignal);
312 // Step 15. In parallel but not really; see #905, using reader and writer,
313 // read all chunks from source and write them to dest.
314 // Due to the locking provided by the reader and writer,
315 // the exact manner in which this happens is not observable to author code,
316 // and so there is flexibility in how this is done.
318 // (Constraint) Error and close states must be propagated
320 // Before piping has started, we have to check for source/destination being
321 // errored/closed manually.
322 if (SourceOrDestErroredOrClosed(aCx)) {
323 return;
326 // We use the following two promises to propagate error and close states
327 // during piping.
328 RefPtr<Promise> readerClosed = mReader->ClosedPromise();
329 readerClosed->AppendNativeHandler(new PipeToPumpHandler(
330 this, &PipeToPump::OnSourceClosed, &PipeToPump::OnSourceErrored));
332 // Note: Because we control the destination/writer it should never be closed
333 // after we did the initial check above with SourceOrDestErroredOrClosed.
334 RefPtr<Promise> writerClosed = mWriter->ClosedPromise();
335 writerClosed->AppendNativeHandler(new PipeToPumpHandler(
336 this, &PipeToPump::OnDestClosed, &PipeToPump::OnDestErrored));
338 Read(aCx);
341 class WriteFinishedPromiseHandler final : public PromiseNativeHandler {
342 RefPtr<PipeToPump> mPipeToPump;
343 PipeToPump::ShutdownAction mAction;
344 bool mHasError;
345 JS::Heap<JS::Value> mError;
347 virtual ~WriteFinishedPromiseHandler() { mozilla::DropJSObjects(this); };
349 public:
350 NS_DECL_CYCLE_COLLECTING_ISUPPORTS
351 NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(WriteFinishedPromiseHandler)
353 explicit WriteFinishedPromiseHandler(
354 JSContext* aCx, PipeToPump* aPipeToPump,
355 PipeToPump::ShutdownAction aAction,
356 JS::Handle<mozilla::Maybe<JS::Value>> aError)
357 : mPipeToPump(aPipeToPump), mAction(aAction) {
358 mHasError = aError.isSome();
359 if (mHasError) {
360 mError = *aError;
362 mozilla::HoldJSObjects(this);
365 MOZ_CAN_RUN_SCRIPT void WriteFinished(JSContext* aCx) {
366 RefPtr<PipeToPump> pipeToPump = mPipeToPump; // XXX known-live?
367 JS::Rooted<Maybe<JS::Value>> error(aCx);
368 if (mHasError) {
369 error = Some(mError);
371 pipeToPump->ShutdownWithActionAfterFinishedWrite(aCx, mAction, error);
374 MOZ_CAN_RUN_SCRIPT void ResolvedCallback(JSContext* aCx,
375 JS::Handle<JS::Value> aValue,
376 ErrorResult&) override {
377 WriteFinished(aCx);
380 MOZ_CAN_RUN_SCRIPT void RejectedCallback(JSContext* aCx,
381 JS::Handle<JS::Value> aReason,
382 ErrorResult&) override {
383 WriteFinished(aCx);
387 NS_IMPL_CYCLE_COLLECTION_WITH_JS_MEMBERS(WriteFinishedPromiseHandler,
388 (mPipeToPump), (mError))
389 NS_IMPL_CYCLE_COLLECTING_ADDREF(WriteFinishedPromiseHandler)
390 NS_IMPL_CYCLE_COLLECTING_RELEASE(WriteFinishedPromiseHandler)
391 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(WriteFinishedPromiseHandler)
392 NS_INTERFACE_MAP_ENTRY(nsISupports)
393 NS_INTERFACE_MAP_END
395 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action
396 // Shutdown with an action: if any of the above requirements ask to shutdown
397 // with an action action, optionally with an error originalError, then:
398 void PipeToPump::ShutdownWithAction(
399 JSContext* aCx, ShutdownAction aAction,
400 JS::Handle<mozilla::Maybe<JS::Value>> aError) {
401 // Step 1. If shuttingDown is true, abort these substeps.
402 if (mShuttingDown) {
403 return;
406 // Step 2. Set shuttingDown to true.
407 mShuttingDown = true;
409 // Step 3. If dest.[[state]] is "writable" and !
410 // WritableStreamCloseQueuedOrInFlight(dest) is false,
411 RefPtr<WritableStream> dest = mWriter->GetStream();
412 if (dest->State() == WritableStream::WriterState::Writable &&
413 !dest->CloseQueuedOrInFlight()) {
414 // Step 3.1. If any chunks have been read but not yet written, write them to
415 // dest.
416 // Step 3.2. Wait until every chunk that has been read has been
417 // written (i.e. the corresponding promises have settled).
419 // Note: Write requests are processed in order, so when the promise
420 // for the last written chunk is settled all previous chunks have been
421 // written as well.
422 if (mLastWritePromise) {
423 mLastWritePromise->AppendNativeHandler(
424 new WriteFinishedPromiseHandler(aCx, this, aAction, aError));
425 return;
429 // Don't have to wait for last write, immediately continue.
430 ShutdownWithActionAfterFinishedWrite(aCx, aAction, aError);
433 class ShutdownActionFinishedPromiseHandler final : public PromiseNativeHandler {
434 RefPtr<PipeToPump> mPipeToPump;
435 bool mHasError;
436 JS::Heap<JS::Value> mError;
438 virtual ~ShutdownActionFinishedPromiseHandler() {
439 mozilla::DropJSObjects(this);
442 public:
443 NS_DECL_CYCLE_COLLECTING_ISUPPORTS
444 NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(
445 ShutdownActionFinishedPromiseHandler)
447 explicit ShutdownActionFinishedPromiseHandler(
448 JSContext* aCx, PipeToPump* aPipeToPump,
449 JS::Handle<mozilla::Maybe<JS::Value>> aError)
450 : mPipeToPump(aPipeToPump) {
451 mHasError = aError.isSome();
452 if (mHasError) {
453 mError = *aError;
455 mozilla::HoldJSObjects(this);
458 void ResolvedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue,
459 ErrorResult&) override {
460 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action
461 // Step 5. Upon fulfillment of p, finalize, passing along originalError if
462 // it was given.
463 JS::Rooted<Maybe<JS::Value>> error(aCx);
464 if (mHasError) {
465 error = Some(mError);
467 mPipeToPump->Finalize(aCx, error);
470 void RejectedCallback(JSContext* aCx, JS::Handle<JS::Value> aReason,
471 ErrorResult&) override {
472 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action
473 // Step 6. Upon rejection of p with reason newError, finalize with
474 // newError.
475 JS::Rooted<Maybe<JS::Value>> error(aCx, Some(aReason));
476 mPipeToPump->Finalize(aCx, error);
480 NS_IMPL_CYCLE_COLLECTION_WITH_JS_MEMBERS(ShutdownActionFinishedPromiseHandler,
481 (mPipeToPump), (mError))
482 NS_IMPL_CYCLE_COLLECTING_ADDREF(ShutdownActionFinishedPromiseHandler)
483 NS_IMPL_CYCLE_COLLECTING_RELEASE(ShutdownActionFinishedPromiseHandler)
484 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ShutdownActionFinishedPromiseHandler)
485 NS_INTERFACE_MAP_ENTRY(nsISupports)
486 NS_INTERFACE_MAP_END
488 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action
489 // Continuation after Step 3. triggered a promise resolution.
490 void PipeToPump::ShutdownWithActionAfterFinishedWrite(
491 JSContext* aCx, ShutdownAction aAction,
492 JS::Handle<mozilla::Maybe<JS::Value>> aError) {
493 if (!aAction) {
494 // Used to implement shutdown without action. Finalize immediately.
495 Finalize(aCx, aError);
496 return;
499 // Step 4. Let p be the result of performing action.
500 RefPtr<PipeToPump> thisRefPtr = this;
501 ErrorResult rv;
502 RefPtr<Promise> p = aAction(aCx, thisRefPtr, aError, rv);
504 // Error while calling actions above, continue immediately with finalization.
505 if (rv.MaybeSetPendingException(aCx)) {
506 JS::Rooted<Maybe<JS::Value>> someError(aCx);
508 JS::Rooted<JS::Value> error(aCx);
509 if (JS_GetPendingException(aCx, &error)) {
510 someError = Some(error.get());
513 JS_ClearPendingException(aCx);
515 Finalize(aCx, someError);
516 return;
519 // Steps 5-6.
520 p->AppendNativeHandler(
521 new ShutdownActionFinishedPromiseHandler(aCx, this, aError));
524 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown
525 // Shutdown: if any of the above requirements or steps ask to shutdown,
526 // optionally with an error error, then:
527 void PipeToPump::Shutdown(JSContext* aCx,
528 JS::Handle<mozilla::Maybe<JS::Value>> aError) {
529 // Note: We implement "shutdown" in terms of "shutdown with action".
530 // We can observe that when passing along an action that always succeeds
531 // shutdown with action and shutdown have the same behavior, when
532 // Ignoring the potential micro task for the promise that we skip anyway.
533 ShutdownWithAction(aCx, nullptr, aError);
536 // https://streams.spec.whatwg.org/#rs-pipeTo-finalize
537 // Finalize: both forms of shutdown will eventually ask to finalize,
538 // optionally with an error error, which means to perform the following steps:
539 void PipeToPump::Finalize(JSContext* aCx,
540 JS::Handle<mozilla::Maybe<JS::Value>> aError) {
541 IgnoredErrorResult rv;
542 // Step 1. Perform ! WritableStreamDefaultWriterRelease(writer).
543 WritableStreamDefaultWriterRelease(aCx, mWriter, rv);
544 NS_WARNING_ASSERTION(!rv.Failed(),
545 "WritableStreamDefaultWriterRelease should not fail.");
547 // Step 2. If reader implements ReadableStreamBYOBReader,
548 // perform ! ReadableStreamBYOBReaderRelease(reader).
549 // Note: We always use a default reader.
550 MOZ_ASSERT(!mReader->IsBYOB());
552 // Step 3. Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader).
553 ReadableStreamDefaultReaderRelease(aCx, mReader, rv);
554 NS_WARNING_ASSERTION(!rv.Failed(),
555 "ReadableStreamReaderGenericRelease should not fail.");
557 // Step 3. If signal is not undefined, remove abortAlgorithm from signal.
558 if (IsFollowing()) {
559 Unfollow();
562 // Step 4. If error was given, reject promise with error.
563 if (aError.isSome()) {
564 JS::Rooted<JS::Value> error(aCx, *aError);
565 mPromise->MaybeReject(error);
566 } else {
567 // Step 5. Otherwise, resolve promise with undefined.
568 mPromise->MaybeResolveWithUndefined();
571 // Remove all references.
572 mPromise = nullptr;
573 mReader = nullptr;
574 mWriter = nullptr;
575 mLastWritePromise = nullptr;
576 Unfollow();
579 void PipeToPump::OnReadFulfilled(JSContext* aCx, JS::Handle<JS::Value> aChunk,
580 ErrorResult& aRv) {
581 // (Constraint) Shutdown must stop activity:
582 // if shuttingDown becomes true, the user agent must not initiate further
583 // reads from reader, and must only perform writes of already-read chunks ...
585 // We may reach this point after |On{Source,Dest}{Clos,Error}ed| has responded
586 // to an out-of-band change. Per the comment in |OnSourceErrored|, we want to
587 // allow the implicated shutdown to proceed, and we don't want to interfere
588 // with or additionally alter its operation. Particularly, we don't want to
589 // queue up the successfully-read chunk (if there was one, and this isn't just
590 // reporting "done") to be written: it wasn't "already-read" when that
591 // error/closure happened.
593 // All specified reactions to a closure/error invoke either the shutdown, or
594 // shutdown with an action, algorithms. Those algorithms each abort if either
595 // shutdown algorithm has already been invoked. So we check for shutdown here
596 // in case of asynchronous closure/error and abort if shutdown has already
597 // started (and possibly finished).
599 // TODO: Implement the eventual resolution from
600 // https://github.com/whatwg/streams/issues/1207
601 if (mShuttingDown) {
602 return;
605 RefPtr<WritableStreamDefaultWriter> writer = mWriter;
606 mLastWritePromise =
607 WritableStreamDefaultWriterWrite(aCx, writer, aChunk, aRv);
608 if (aRv.Failed()) {
609 mLastWritePromise = nullptr;
610 return;
613 mLastWritePromise->AppendNativeHandler(
614 new PipeToPumpHandler(this, nullptr, &PipeToPump::OnDestErrored));
616 // Last read has finished, so it's time to start reading again.
617 Read(aCx);
620 void PipeToPump::OnWriterReady(JSContext* aCx, JS::Handle<JS::Value>) {
621 // Writer is ready again (i.e. backpressure was resolved), so read.
622 Read(aCx);
625 struct PipeToReadRequest : public ReadRequest {
626 public:
627 NS_DECL_ISUPPORTS_INHERITED
628 NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(PipeToReadRequest, ReadRequest)
630 RefPtr<PipeToPump> mPipeToPump;
632 explicit PipeToReadRequest(PipeToPump* aPipeToPump)
633 : mPipeToPump(aPipeToPump) {}
635 MOZ_CAN_RUN_SCRIPT_BOUNDARY void ChunkSteps(JSContext* aCx,
636 JS::Handle<JS::Value> aChunk,
637 ErrorResult& aRv) override {
638 RefPtr<PipeToPump> pipeToPump = mPipeToPump; // XXX known live?
639 pipeToPump->OnReadFulfilled(aCx, aChunk, aRv);
642 // The reader's closed promise handlers will already call OnSourceClosed/
643 // OnSourceErrored, so these steps can just be ignored.
644 void CloseSteps(JSContext* aCx, ErrorResult& aRv) override {}
645 void ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError,
646 ErrorResult& aRv) override {}
648 protected:
649 virtual ~PipeToReadRequest() = default;
652 NS_IMPL_CYCLE_COLLECTION_CLASS(PipeToReadRequest)
654 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN_INHERITED(PipeToReadRequest, ReadRequest)
655 NS_IMPL_CYCLE_COLLECTION_UNLINK(mPipeToPump)
656 NS_IMPL_CYCLE_COLLECTION_UNLINK_END
658 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN_INHERITED(PipeToReadRequest,
659 ReadRequest)
660 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mPipeToPump)
661 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
663 NS_IMPL_ADDREF_INHERITED(PipeToReadRequest, ReadRequest)
664 NS_IMPL_RELEASE_INHERITED(PipeToReadRequest, ReadRequest)
666 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PipeToReadRequest)
667 NS_INTERFACE_MAP_END_INHERITING(ReadRequest)
669 void PipeToPump::Read(JSContext* aCx) {
670 #ifdef DEBUG
671 mReadChunk = true;
672 #endif
674 // (Constraint) Shutdown must stop activity:
675 // If shuttingDown becomes true, the user agent must not initiate
676 // further reads from reader
677 if (mShuttingDown) {
678 return;
681 // (Constraint) Backpressure must be enforced:
682 // While WritableStreamDefaultWriterGetDesiredSize(writer) is ≤ 0 or is null,
683 // the user agent must not read from reader.
684 Nullable<double> desiredSize =
685 WritableStreamDefaultWriterGetDesiredSize(mWriter);
686 if (desiredSize.IsNull()) {
687 // This means the writer has errored. This is going to be handled
688 // by the writer closed promise.
689 return;
692 if (desiredSize.Value() <= 0) {
693 // Wait for the writer to become ready before reading more data from
694 // the reader. We don't care about rejections here, because those are
695 // already handled by the writer closed promise.
696 RefPtr<Promise> readyPromise = mWriter->Ready();
697 readyPromise->AppendNativeHandler(
698 new PipeToPumpHandler(this, &PipeToPump::OnWriterReady, nullptr));
699 return;
702 RefPtr<ReadableStreamDefaultReader> reader = mReader;
703 RefPtr<ReadRequest> request = new PipeToReadRequest(this);
704 ErrorResult rv;
705 ReadableStreamDefaultReaderRead(aCx, reader, request, rv);
706 if (rv.MaybeSetPendingException(aCx)) {
707 // XXX It's actually not quite obvious what we should do here.
708 // We've got an error during reading, so on the surface it seems logical
709 // to invoke `OnSourceErrored`. However in certain cases the required
710 // condition > source.[[state]] is or becomes "errored" < won't actually
711 // happen i.e. when `WritableStreamDefaultWriterWrite` called from
712 // `OnReadFulfilled` (via PipeToReadRequest::ChunkSteps) fails in
713 // a synchronous fashion.
714 JS::Rooted<JS::Value> error(aCx);
715 JS::Rooted<Maybe<JS::Value>> someError(aCx);
717 // The error was moved to the JSContext by MaybeSetPendingException.
718 if (JS_GetPendingException(aCx, &error)) {
719 someError = Some(error.get());
722 JS_ClearPendingException(aCx);
724 Shutdown(aCx, someError);
728 // Step 3. Closing must be propagated forward: if source.[[state]] is or
729 // becomes "closed", then
730 void PipeToPump::OnSourceClosed(JSContext* aCx, JS::Handle<JS::Value>) {
731 // Step 3.1. If preventClose is false, shutdown with an action of
732 // ! WritableStreamDefaultWriterCloseWithErrorPropagation(writer).
733 if (!mPreventClose) {
734 ShutdownWithAction(
735 aCx,
736 [](JSContext* aCx, PipeToPump* aPipeToPump,
737 JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv)
738 MOZ_CAN_RUN_SCRIPT {
739 RefPtr<WritableStreamDefaultWriter> writer = aPipeToPump->mWriter;
740 return WritableStreamDefaultWriterCloseWithErrorPropagation(
741 aCx, writer, aRv);
743 JS::NothingHandleValue);
744 } else {
745 // Step 3.2 Otherwise, shutdown.
746 Shutdown(aCx, JS::NothingHandleValue);
750 // Step 1. Errors must be propagated forward: if source.[[state]] is or
751 // becomes "errored", then
752 void PipeToPump::OnSourceErrored(JSContext* aCx,
753 JS::Handle<JS::Value> aSourceStoredError) {
754 // If |source| becomes errored not during a pending read, it's clear we must
755 // react immediately.
757 // But what if |source| becomes errored *during* a pending read? Should this
758 // first error, or the pending-read second error, predominate? Two semantics
759 // are possible when |source|/|dest| become closed or errored while there's a
760 // pending read:
762 // 1. Wait until the read fulfills or rejects, then respond to the
763 // closure/error without regard to the read having fulfilled or rejected.
764 // (This will simply not react to the read being rejected, or it will
765 // queue up the read chunk to be written during shutdown.)
766 // 2. React to the closure/error immediately per "Error and close states
767 // must be propagated". Then when the read fulfills or rejects later, do
768 // nothing.
770 // The spec doesn't clearly require either semantics. It requires that
771 // *already-read* chunks be written (at least if |dest| didn't become errored
772 // or closed such that no further writes can occur). But it's silent as to
773 // not-fully-read chunks. (These semantic differences may only be observable
774 // with very carefully constructed readable/writable streams.)
776 // It seems best, generally, to react to the temporally-earliest problem that
777 // arises, so we implement option #2. (Blink, in contrast, currently
778 // implements option #1.)
780 // All specified reactions to a closure/error invoke either the shutdown, or
781 // shutdown with an action, algorithms. Those algorithms each abort if either
782 // shutdown algorithm has already been invoked. So we don't need to do
783 // anything special here to deal with a pending read.
785 // TODO: Implement the eventual resolution from
786 // https://github.com/whatwg/streams/issues/1207
788 // Step 1.1 If preventAbort is false, shutdown with an action of
789 // ! WritableStreamAbort(dest, source.[[storedError]])
790 // and with source.[[storedError]].
791 JS::Rooted<Maybe<JS::Value>> error(aCx, Some(aSourceStoredError));
792 if (!mPreventAbort) {
793 ShutdownWithAction(
794 aCx,
795 [](JSContext* aCx, PipeToPump* aPipeToPump,
796 JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv)
797 MOZ_CAN_RUN_SCRIPT {
798 JS::Rooted<JS::Value> error(aCx, *aError);
799 RefPtr<WritableStream> dest = aPipeToPump->mWriter->GetStream();
800 return WritableStreamAbort(aCx, dest, error, aRv);
802 error);
803 } else {
804 // Step 1.1. Otherwise, shutdown with source.[[storedError]].
805 Shutdown(aCx, error);
809 // Step 4. Closing must be propagated backward:
810 // if ! WritableStreamCloseQueuedOrInFlight(dest) is true
811 // or dest.[[state]] is "closed", then
812 void PipeToPump::OnDestClosed(JSContext* aCx, JS::Handle<JS::Value>) {
813 // Step 4.1. Assert: no chunks have been read or written.
814 // Note: No reading automatically implies no writing.
815 // In a perfect world OnDestClosed would only be called before we start
816 // piping, because afterwards the writer has an exclusive lock on the stream.
817 // In reality the closed promise can still be resolved after we release
818 // the lock on the writer in Finalize.
819 if (mShuttingDown) {
820 return;
822 MOZ_ASSERT(!mReadChunk);
824 // Step 4.2. Let destClosed be a new TypeError.
825 JS::Rooted<Maybe<JS::Value>> destClosed(aCx, Nothing());
827 ErrorResult rv;
828 rv.ThrowTypeError("Cannot pipe to closed stream");
829 JS::Rooted<JS::Value> error(aCx);
830 bool ok = ToJSValue(aCx, std::move(rv), &error);
831 MOZ_RELEASE_ASSERT(ok, "must be ok");
832 destClosed = Some(error.get());
835 // Step 4.3. If preventCancel is false, shutdown with an action of
836 // ! ReadableStreamCancel(source, destClosed) and with destClosed.
837 if (!mPreventCancel) {
838 ShutdownWithAction(
839 aCx,
840 [](JSContext* aCx, PipeToPump* aPipeToPump,
841 JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv)
842 MOZ_CAN_RUN_SCRIPT {
843 JS::Rooted<JS::Value> error(aCx, *aError);
844 RefPtr<ReadableStream> dest = aPipeToPump->mReader->GetStream();
845 return ReadableStreamCancel(aCx, dest, error, aRv);
847 destClosed);
848 } else {
849 // Step 4.4. Otherwise, shutdown with destClosed.
850 Shutdown(aCx, destClosed);
854 // Step 2. Errors must be propagated backward: if dest.[[state]] is or becomes
855 // "errored", then
856 void PipeToPump::OnDestErrored(JSContext* aCx,
857 JS::Handle<JS::Value> aDestStoredError) {
858 // Step 2.1. If preventCancel is false, shutdown with an action of
859 // ! ReadableStreamCancel(source, dest.[[storedError]])
860 // and with dest.[[storedError]].
861 JS::Rooted<Maybe<JS::Value>> error(aCx, Some(aDestStoredError));
862 if (!mPreventCancel) {
863 ShutdownWithAction(
864 aCx,
865 [](JSContext* aCx, PipeToPump* aPipeToPump,
866 JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv)
867 MOZ_CAN_RUN_SCRIPT {
868 JS::Rooted<JS::Value> error(aCx, *aError);
869 RefPtr<ReadableStream> dest = aPipeToPump->mReader->GetStream();
870 return ReadableStreamCancel(aCx, dest, error, aRv);
872 error);
873 } else {
874 // Step 2.1. Otherwise, shutdown with dest.[[storedError]].
875 Shutdown(aCx, error);
879 NS_IMPL_CYCLE_COLLECTION_CLASS(PipeToPump)
880 NS_IMPL_CYCLE_COLLECTING_ADDREF(PipeToPump)
881 NS_IMPL_CYCLE_COLLECTING_RELEASE(PipeToPump)
882 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PipeToPump)
883 NS_INTERFACE_MAP_ENTRY(nsISupports)
884 NS_INTERFACE_MAP_END
886 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(PipeToPump)
887 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mPromise)
888 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReader)
889 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mWriter)
890 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mLastWritePromise)
891 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
893 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(PipeToPump)
894 NS_IMPL_CYCLE_COLLECTION_UNLINK(mPromise)
895 NS_IMPL_CYCLE_COLLECTION_UNLINK(mReader)
896 NS_IMPL_CYCLE_COLLECTION_UNLINK(mWriter)
897 NS_IMPL_CYCLE_COLLECTION_UNLINK(mLastWritePromise)
898 NS_IMPL_CYCLE_COLLECTION_UNLINK_END
900 // https://streams.spec.whatwg.org/#readable-stream-pipe-to
901 already_AddRefed<Promise> ReadableStreamPipeTo(
902 ReadableStream* aSource, WritableStream* aDest, bool aPreventClose,
903 bool aPreventAbort, bool aPreventCancel, AbortSignal* aSignal,
904 mozilla::ErrorResult& aRv) {
905 // Step 1. Assert: source implements ReadableStream. (Implicit)
906 // Step 2. Assert: dest implements WritableStream. (Implicit)
907 // Step 3. Assert: preventClose, preventAbort, and preventCancel are all
908 // booleans (Implicit)
909 // Step 4. If signal was not given, let signal be
910 // undefined. (Implicit)
911 // Step 5. Assert: either signal is undefined, or signal
912 // implements AbortSignal. (Implicit)
913 // Step 6. Assert: !IsReadableStreamLocked(source) is false.
914 MOZ_ASSERT(!IsReadableStreamLocked(aSource));
916 // Step 7. Assert: !IsWritableStreamLocked(dest) is false.
917 MOZ_ASSERT(!IsWritableStreamLocked(aDest));
919 AutoJSAPI jsapi;
920 if (!jsapi.Init(aSource->GetParentObject())) {
921 aRv.ThrowUnknownError("Internal error");
922 return nullptr;
924 JSContext* cx = jsapi.cx();
926 // Step 8. If source.[[controller]] implements ReadableByteStreamController,
927 // let reader be either !AcquireReadableStreamBYOBReader(source) or
928 // !AcquireReadableStreamDefaultReader(source), at the user agent’s
929 // discretion.
930 // Step 9. Otherwise, let reader be
931 // !AcquireReadableStreamDefaultReader(source).
933 // Note: In the interests of simplicity, we choose here to always acquire
934 // a default reader.
935 RefPtr<ReadableStreamDefaultReader> reader =
936 AcquireReadableStreamDefaultReader(aSource, aRv);
937 if (aRv.Failed()) {
938 return nullptr;
941 // Step 10. Let writer be ! AcquireWritableStreamDefaultWriter(dest).
942 RefPtr<WritableStreamDefaultWriter> writer =
943 AcquireWritableStreamDefaultWriter(aDest, aRv);
944 if (aRv.Failed()) {
945 return nullptr;
948 // Step 11. Set source.[[disturbed]] to true.
949 aSource->SetDisturbed(true);
951 // Step 12. Let shuttingDown be false.
952 // Note: PipeToPump ensures this by construction.
954 // Step 13. Let promise be a new promise.
955 RefPtr<Promise> promise = Promise::Create(aSource->GetParentObject(), aRv);
956 if (aRv.Failed()) {
957 return nullptr;
960 // Steps 14-15.
961 RefPtr<PipeToPump> pump = new PipeToPump(
962 promise, reader, writer, aPreventClose, aPreventAbort, aPreventCancel);
963 pump->Start(cx, aSignal);
965 // Step 16. Return promise.
966 return promise.forget();
969 } // namespace mozilla::dom