Bug 1806483 - Enable TSAN cppunittests by default. r=jmaher
[gecko.git] / dom / streams / ReadableStreamPipeTo.cpp
blob24cb1d7f08ffb2475b0d3e8f83ed40d724629da2
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim:set ts=2 sw=2 sts=2 et cindent: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/dom/AbortFollower.h"
8 #include "mozilla/dom/AbortSignal.h"
9 #include "mozilla/dom/ReadableStream.h"
10 #include "mozilla/dom/ReadableStreamDefaultReader.h"
11 #include "mozilla/dom/ReadableStreamPipeTo.h"
12 #include "mozilla/dom/WritableStream.h"
13 #include "mozilla/dom/WritableStreamDefaultWriter.h"
14 #include "mozilla/dom/Promise.h"
15 #include "mozilla/dom/PromiseNativeHandler.h"
16 #include "mozilla/AlreadyAddRefed.h"
17 #include "mozilla/ErrorResult.h"
18 #include "nsCycleCollectionParticipant.h"
19 #include "nsISupportsImpl.h"
21 #include "js/Exception.h"
23 namespace mozilla::dom {
25 struct PipeToReadRequest;
26 class WriteFinishedPromiseHandler;
27 class ShutdownActionFinishedPromiseHandler;
29 // https://streams.spec.whatwg.org/#readable-stream-pipe-to (Steps 14-15.)
31 // This class implements everything that is required to read all chunks from
32 // the reader (source) and write them to writer (destination), while
33 // following the constraints given in the spec using our implementation-defined
34 // behavior.
36 // The cycle-collected references look roughly like this:
37 // clang-format off
39 // Closed promise <-- ReadableStreamDefaultReader <--> ReadableStream
40 // | ^ |
41 // |(PromiseHandler) |(mReader) |(ReadRequest)
42 // | | |
43 // |-------------> PipeToPump <-------
44 // ^ | |
45 // |---------------| | |
46 // | | |-------(mLastWrite) -------->
47 // |(PromiseHandler) | |< ---- (PromiseHandler) ---- Promise
48 // | | ^
49 // | |(mWriter) |(mWriteRequests)
50 // | v |
51 // Closed promise <-- WritableStreamDefaultWriter <--------> WritableStream
53 // clang-format on
54 class PipeToPump final : public AbortFollower {
55 NS_DECL_CYCLE_COLLECTING_ISUPPORTS
56 NS_DECL_CYCLE_COLLECTION_CLASS(PipeToPump)
58 friend struct PipeToReadRequest;
59 friend class WriteFinishedPromiseHandler;
60 friend class ShutdownActionFinishedPromiseHandler;
62 PipeToPump(Promise* aPromise, ReadableStreamDefaultReader* aReader,
63 WritableStreamDefaultWriter* aWriter, bool aPreventClose,
64 bool aPreventAbort, bool aPreventCancel)
65 : mPromise(aPromise),
66 mReader(aReader),
67 mWriter(aWriter),
68 mPreventClose(aPreventClose),
69 mPreventAbort(aPreventAbort),
70 mPreventCancel(aPreventCancel) {}
72 MOZ_CAN_RUN_SCRIPT void Start(JSContext* aCx, AbortSignal* aSignal);
74 MOZ_CAN_RUN_SCRIPT_BOUNDARY void RunAbortAlgorithm() override;
76 private:
77 ~PipeToPump() override = default;
79 MOZ_CAN_RUN_SCRIPT void PerformAbortAlgorithm(JSContext* aCx,
80 AbortSignalImpl* aSignal);
82 MOZ_CAN_RUN_SCRIPT bool SourceOrDestErroredOrClosed(JSContext* aCx);
84 using ShutdownAction = already_AddRefed<Promise> (*)(
85 JSContext*, PipeToPump*, JS::Handle<mozilla::Maybe<JS::Value>>,
86 ErrorResult&);
88 MOZ_CAN_RUN_SCRIPT void ShutdownWithAction(
89 JSContext* aCx, ShutdownAction aAction,
90 JS::Handle<mozilla::Maybe<JS::Value>> aError);
91 MOZ_CAN_RUN_SCRIPT void ShutdownWithActionAfterFinishedWrite(
92 JSContext* aCx, ShutdownAction aAction,
93 JS::Handle<mozilla::Maybe<JS::Value>> aError);
95 MOZ_CAN_RUN_SCRIPT void Shutdown(
96 JSContext* aCx, JS::Handle<mozilla::Maybe<JS::Value>> aError);
98 void Finalize(JSContext* aCx, JS::Handle<mozilla::Maybe<JS::Value>> aError);
100 MOZ_CAN_RUN_SCRIPT void OnReadFulfilled(JSContext* aCx,
101 JS::Handle<JS::Value> aChunk,
102 ErrorResult& aRv);
103 MOZ_CAN_RUN_SCRIPT void OnWriterReady(JSContext* aCx, JS::Handle<JS::Value>);
104 MOZ_CAN_RUN_SCRIPT void Read(JSContext* aCx);
106 MOZ_CAN_RUN_SCRIPT void OnSourceClosed(JSContext* aCx, JS::Handle<JS::Value>);
107 MOZ_CAN_RUN_SCRIPT void OnSourceErrored(JSContext* aCx,
108 JS::Handle<JS::Value> aError);
110 MOZ_CAN_RUN_SCRIPT void OnDestClosed(JSContext* aCx, JS::Handle<JS::Value>);
111 MOZ_CAN_RUN_SCRIPT void OnDestErrored(JSContext* aCx,
112 JS::Handle<JS::Value> aError);
114 RefPtr<Promise> mPromise;
115 RefPtr<ReadableStreamDefaultReader> mReader;
116 RefPtr<WritableStreamDefaultWriter> mWriter;
117 RefPtr<Promise> mLastWritePromise;
118 const bool mPreventClose;
119 const bool mPreventAbort;
120 const bool mPreventCancel;
121 bool mShuttingDown = false;
122 #ifdef DEBUG
123 bool mReadChunk = false;
124 #endif
127 // This is a helper class for PipeToPump that allows it to attach
128 // member functions as promise handlers.
129 class PipeToPumpHandler final : public PromiseNativeHandler {
130 virtual ~PipeToPumpHandler() = default;
132 using FunPtr = void (PipeToPump::*)(JSContext*, JS::Handle<JS::Value>);
134 RefPtr<PipeToPump> mPipeToPump;
135 FunPtr mResolved;
136 FunPtr mRejected;
138 public:
139 NS_DECL_CYCLE_COLLECTING_ISUPPORTS
140 NS_DECL_CYCLE_COLLECTION_CLASS(PipeToPumpHandler)
142 explicit PipeToPumpHandler(PipeToPump* aPipeToPump, FunPtr aResolved,
143 FunPtr aRejected)
144 : mPipeToPump(aPipeToPump), mResolved(aResolved), mRejected(aRejected) {}
146 void ResolvedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue,
147 ErrorResult&) override {
148 if (mResolved) {
149 (mPipeToPump->*mResolved)(aCx, aValue);
153 void RejectedCallback(JSContext* aCx, JS::Handle<JS::Value> aReason,
154 ErrorResult&) override {
155 if (mRejected) {
156 (mPipeToPump->*mRejected)(aCx, aReason);
161 NS_IMPL_CYCLE_COLLECTION(PipeToPumpHandler, mPipeToPump)
162 NS_IMPL_CYCLE_COLLECTING_ADDREF(PipeToPumpHandler)
163 NS_IMPL_CYCLE_COLLECTING_RELEASE(PipeToPumpHandler)
164 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PipeToPumpHandler)
165 NS_INTERFACE_MAP_ENTRY(nsISupports)
166 NS_INTERFACE_MAP_END
168 void PipeToPump::RunAbortAlgorithm() {
169 AutoJSAPI jsapi;
170 if (!jsapi.Init(mReader->GetStream()->GetParentObject())) {
171 NS_WARNING(
172 "Failed to initialize AutoJSAPI in PipeToPump::RunAbortAlgorithm");
173 return;
175 JSContext* cx = jsapi.cx();
177 RefPtr<AbortSignalImpl> signal = Signal();
178 PerformAbortAlgorithm(cx, signal);
181 void PipeToPump::PerformAbortAlgorithm(JSContext* aCx,
182 AbortSignalImpl* aSignal) {
183 MOZ_ASSERT(aSignal->Aborted());
185 // https://streams.spec.whatwg.org/#readable-stream-pipe-to
186 // Step 14.1. Let abortAlgorithm be the following steps:
187 // Note: All the following steps are 14.1.xx
189 // Step 1. Let error be signal’s abort reason.
190 JS::Rooted<JS::Value> error(aCx);
191 aSignal->GetReason(aCx, &error);
193 auto action = [](JSContext* aCx, PipeToPump* aPipeToPump,
194 JS::Handle<mozilla::Maybe<JS::Value>> aError,
195 ErrorResult& aRv) MOZ_CAN_RUN_SCRIPT {
196 JS::Rooted<JS::Value> error(aCx, *aError);
198 // Step 2. Let actions be an empty ordered set.
199 nsTArray<RefPtr<Promise>> actions;
201 // Step 3. If preventAbort is false, append the following action to actions:
202 if (!aPipeToPump->mPreventAbort) {
203 RefPtr<WritableStream> dest = aPipeToPump->mWriter->GetStream();
205 // Step 3.1. If dest.[[state]] is "writable", return !
206 // WritableStreamAbort(dest, error).
207 if (dest->State() == WritableStream::WriterState::Writable) {
208 RefPtr<Promise> p = WritableStreamAbort(aCx, dest, error, aRv);
209 if (aRv.Failed()) {
210 return already_AddRefed<Promise>();
212 actions.AppendElement(p);
215 // Step 3.2. Otherwise, return a promise resolved with undefined.
216 // Note: This is basically a no-op.
219 // Step 4. If preventCancel is false, append the following action action to
220 // actions:
221 if (!aPipeToPump->mPreventCancel) {
222 RefPtr<ReadableStream> source = aPipeToPump->mReader->GetStream();
224 // Step 4.1. If source.[[state]] is "readable", return !
225 // ReadableStreamCancel(source, error).
226 if (source->State() == ReadableStream::ReaderState::Readable) {
227 RefPtr<Promise> p = ReadableStreamCancel(aCx, source, error, aRv);
228 if (aRv.Failed()) {
229 return already_AddRefed<Promise>();
231 actions.AppendElement(p);
234 // Step 4.2. Otherwise, return a promise resolved with undefined.
235 // No-op again.
238 // Step 5. .. action consisting of getting a promise to wait for
239 // all of the actions in actions ...
240 return Promise::All(aCx, actions, aRv);
243 // Step 5. Shutdown with an action consisting of getting a promise to wait for
244 // all of the actions in actions, and with error.
245 JS::Rooted<Maybe<JS::Value>> someError(aCx, Some(error.get()));
246 ShutdownWithAction(aCx, action, someError);
249 bool PipeToPump::SourceOrDestErroredOrClosed(JSContext* aCx) {
250 // (Constraint) Error and close states must be propagated:
251 // the following conditions must be applied in order.
252 RefPtr<ReadableStream> source = mReader->GetStream();
253 RefPtr<WritableStream> dest = mWriter->GetStream();
255 // Step 1. Errors must be propagated forward: if source.[[state]] is or
256 // becomes "errored", then
257 if (source->State() == ReadableStream::ReaderState::Errored) {
258 JS::Rooted<JS::Value> storedError(aCx, source->StoredError());
259 OnSourceErrored(aCx, storedError);
260 return true;
263 // Step 2. Errors must be propagated backward: if dest.[[state]] is or becomes
264 // "errored", then
265 if (dest->State() == WritableStream::WriterState::Errored) {
266 JS::Rooted<JS::Value> storedError(aCx, dest->StoredError());
267 OnDestErrored(aCx, storedError);
268 return true;
271 // Step 3. Closing must be propagated forward: if source.[[state]] is or
272 // becomes "closed", then
273 if (source->State() == ReadableStream::ReaderState::Closed) {
274 OnSourceClosed(aCx, JS::UndefinedHandleValue);
275 return true;
278 // Step 4. Closing must be propagated backward:
279 // if ! WritableStreamCloseQueuedOrInFlight(dest) is true
280 // or dest.[[state]] is "closed", then
281 if (dest->CloseQueuedOrInFlight() ||
282 dest->State() == WritableStream::WriterState::Closed) {
283 OnDestClosed(aCx, JS::UndefinedHandleValue);
284 return true;
287 return false;
290 // https://streams.spec.whatwg.org/#readable-stream-pipe-to
291 // Steps 14-15.
292 void PipeToPump::Start(JSContext* aCx, AbortSignal* aSignal) {
293 // Step 14. If signal is not undefined,
294 if (aSignal) {
295 // Step 14.1. Let abortAlgorithm be the following steps:
296 // ... This is implemented by RunAbortAlgorithm.
298 // Step 14.2. If signal is aborted, perform abortAlgorithm and
299 // return promise.
300 if (aSignal->Aborted()) {
301 PerformAbortAlgorithm(aCx, aSignal);
302 return;
305 // Step 14.3. Add abortAlgorithm to signal.
306 Follow(aSignal);
309 // Step 15. In parallel but not really; see #905, using reader and writer,
310 // read all chunks from source and write them to dest.
311 // Due to the locking provided by the reader and writer,
312 // the exact manner in which this happens is not observable to author code,
313 // and so there is flexibility in how this is done.
315 // (Constraint) Error and close states must be propagated
317 // Before piping has started, we have to check for source/destination being
318 // errored/closed manually.
319 if (SourceOrDestErroredOrClosed(aCx)) {
320 return;
323 // We use the following two promises to propagate error and close states
324 // during piping.
325 RefPtr<Promise> readerClosed = mReader->ClosedPromise();
326 readerClosed->AppendNativeHandler(new PipeToPumpHandler(
327 this, &PipeToPump::OnSourceClosed, &PipeToPump::OnSourceErrored));
329 // Note: Because we control the destination/writer it should never be closed
330 // after we did the initial check above with SourceOrDestErroredOrClosed.
331 RefPtr<Promise> writerClosed = mWriter->ClosedPromise();
332 writerClosed->AppendNativeHandler(new PipeToPumpHandler(
333 this, &PipeToPump::OnDestClosed, &PipeToPump::OnDestErrored));
335 Read(aCx);
338 class WriteFinishedPromiseHandler final : public PromiseNativeHandler {
339 RefPtr<PipeToPump> mPipeToPump;
340 PipeToPump::ShutdownAction mAction;
341 bool mHasError;
342 JS::Heap<JS::Value> mError;
344 virtual ~WriteFinishedPromiseHandler() { mozilla::DropJSObjects(this); };
346 public:
347 NS_DECL_CYCLE_COLLECTING_ISUPPORTS
348 NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(WriteFinishedPromiseHandler)
350 explicit WriteFinishedPromiseHandler(
351 JSContext* aCx, PipeToPump* aPipeToPump,
352 PipeToPump::ShutdownAction aAction,
353 JS::Handle<mozilla::Maybe<JS::Value>> aError)
354 : mPipeToPump(aPipeToPump), mAction(aAction) {
355 mHasError = aError.isSome();
356 if (mHasError) {
357 mError = *aError;
359 mozilla::HoldJSObjects(this);
362 MOZ_CAN_RUN_SCRIPT void WriteFinished(JSContext* aCx) {
363 RefPtr<PipeToPump> pipeToPump = mPipeToPump; // XXX known-live?
364 JS::Rooted<Maybe<JS::Value>> error(aCx);
365 if (mHasError) {
366 error = Some(mError);
368 pipeToPump->ShutdownWithActionAfterFinishedWrite(aCx, mAction, error);
371 MOZ_CAN_RUN_SCRIPT void ResolvedCallback(JSContext* aCx,
372 JS::Handle<JS::Value> aValue,
373 ErrorResult&) override {
374 WriteFinished(aCx);
377 MOZ_CAN_RUN_SCRIPT void RejectedCallback(JSContext* aCx,
378 JS::Handle<JS::Value> aReason,
379 ErrorResult&) override {
380 WriteFinished(aCx);
384 NS_IMPL_CYCLE_COLLECTION_WITH_JS_MEMBERS(WriteFinishedPromiseHandler,
385 (mPipeToPump), (mError))
386 NS_IMPL_CYCLE_COLLECTING_ADDREF(WriteFinishedPromiseHandler)
387 NS_IMPL_CYCLE_COLLECTING_RELEASE(WriteFinishedPromiseHandler)
388 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(WriteFinishedPromiseHandler)
389 NS_INTERFACE_MAP_ENTRY(nsISupports)
390 NS_INTERFACE_MAP_END
392 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action
393 // Shutdown with an action: if any of the above requirements ask to shutdown
394 // with an action action, optionally with an error originalError, then:
395 void PipeToPump::ShutdownWithAction(
396 JSContext* aCx, ShutdownAction aAction,
397 JS::Handle<mozilla::Maybe<JS::Value>> aError) {
398 // Step 1. If shuttingDown is true, abort these substeps.
399 if (mShuttingDown) {
400 return;
403 // Step 2. Set shuttingDown to true.
404 mShuttingDown = true;
406 // Step 3. If dest.[[state]] is "writable" and !
407 // WritableStreamCloseQueuedOrInFlight(dest) is false,
408 RefPtr<WritableStream> dest = mWriter->GetStream();
409 if (dest->State() == WritableStream::WriterState::Writable &&
410 !dest->CloseQueuedOrInFlight()) {
411 // Step 3.1. If any chunks have been read but not yet written, write them to
412 // dest.
413 // Step 3.2. Wait until every chunk that has been read has been
414 // written (i.e. the corresponding promises have settled).
416 // Note: Write requests are processed in order, so when the promise
417 // for the last written chunk is settled all previous chunks have been
418 // written as well.
419 if (mLastWritePromise) {
420 mLastWritePromise->AppendNativeHandler(
421 new WriteFinishedPromiseHandler(aCx, this, aAction, aError));
422 return;
426 // Don't have to wait for last write, immediately continue.
427 ShutdownWithActionAfterFinishedWrite(aCx, aAction, aError);
430 class ShutdownActionFinishedPromiseHandler final : public PromiseNativeHandler {
431 RefPtr<PipeToPump> mPipeToPump;
432 bool mHasError;
433 JS::Heap<JS::Value> mError;
435 virtual ~ShutdownActionFinishedPromiseHandler() {
436 mozilla::DropJSObjects(this);
439 public:
440 NS_DECL_CYCLE_COLLECTING_ISUPPORTS
441 NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(
442 ShutdownActionFinishedPromiseHandler)
444 explicit ShutdownActionFinishedPromiseHandler(
445 JSContext* aCx, PipeToPump* aPipeToPump,
446 JS::Handle<mozilla::Maybe<JS::Value>> aError)
447 : mPipeToPump(aPipeToPump) {
448 mHasError = aError.isSome();
449 if (mHasError) {
450 mError = *aError;
452 mozilla::HoldJSObjects(this);
455 void ResolvedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue,
456 ErrorResult&) override {
457 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action
458 // Step 5. Upon fulfillment of p, finalize, passing along originalError if
459 // it was given.
460 JS::Rooted<Maybe<JS::Value>> error(aCx);
461 if (mHasError) {
462 error = Some(mError);
464 mPipeToPump->Finalize(aCx, error);
467 void RejectedCallback(JSContext* aCx, JS::Handle<JS::Value> aReason,
468 ErrorResult&) override {
469 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action
470 // Step 6. Upon rejection of p with reason newError, finalize with
471 // newError.
472 JS::Rooted<Maybe<JS::Value>> error(aCx, Some(aReason));
473 mPipeToPump->Finalize(aCx, error);
477 NS_IMPL_CYCLE_COLLECTION_WITH_JS_MEMBERS(ShutdownActionFinishedPromiseHandler,
478 (mPipeToPump), (mError))
479 NS_IMPL_CYCLE_COLLECTING_ADDREF(ShutdownActionFinishedPromiseHandler)
480 NS_IMPL_CYCLE_COLLECTING_RELEASE(ShutdownActionFinishedPromiseHandler)
481 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ShutdownActionFinishedPromiseHandler)
482 NS_INTERFACE_MAP_ENTRY(nsISupports)
483 NS_INTERFACE_MAP_END
485 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action
486 // Continuation after Step 3. triggered a promise resolution.
487 void PipeToPump::ShutdownWithActionAfterFinishedWrite(
488 JSContext* aCx, ShutdownAction aAction,
489 JS::Handle<mozilla::Maybe<JS::Value>> aError) {
490 if (!aAction) {
491 // Used to implement shutdown without action. Finalize immediately.
492 Finalize(aCx, aError);
493 return;
496 // Step 4. Let p be the result of performing action.
497 RefPtr<PipeToPump> thisRefPtr = this;
498 ErrorResult rv;
499 RefPtr<Promise> p = aAction(aCx, thisRefPtr, aError, rv);
501 // Error while calling actions above, continue immediately with finalization.
502 if (rv.MaybeSetPendingException(aCx)) {
503 JS::Rooted<Maybe<JS::Value>> someError(aCx);
505 JS::Rooted<JS::Value> error(aCx);
506 if (JS_GetPendingException(aCx, &error)) {
507 someError = Some(error.get());
510 JS_ClearPendingException(aCx);
512 Finalize(aCx, someError);
513 return;
516 // Steps 5-6.
517 p->AppendNativeHandler(
518 new ShutdownActionFinishedPromiseHandler(aCx, this, aError));
521 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown
522 // Shutdown: if any of the above requirements or steps ask to shutdown,
523 // optionally with an error error, then:
524 void PipeToPump::Shutdown(JSContext* aCx,
525 JS::Handle<mozilla::Maybe<JS::Value>> aError) {
526 // Note: We implement "shutdown" in terms of "shutdown with action".
527 // We can observe that when passing along an action that always succeeds
528 // shutdown with action and shutdown have the same behavior, when
529 // Ignoring the potential micro task for the promise that we skip anyway.
530 ShutdownWithAction(aCx, nullptr, aError);
533 // https://streams.spec.whatwg.org/#rs-pipeTo-finalize
534 // Finalize: both forms of shutdown will eventually ask to finalize,
535 // optionally with an error error, which means to perform the following steps:
536 void PipeToPump::Finalize(JSContext* aCx,
537 JS::Handle<mozilla::Maybe<JS::Value>> aError) {
538 IgnoredErrorResult rv;
539 // Step 1. Perform ! WritableStreamDefaultWriterRelease(writer).
540 WritableStreamDefaultWriterRelease(aCx, mWriter, rv);
541 NS_WARNING_ASSERTION(!rv.Failed(),
542 "WritableStreamDefaultWriterRelease should not fail.");
544 // Step 2. If reader implements ReadableStreamBYOBReader,
545 // perform ! ReadableStreamBYOBReaderRelease(reader).
546 // Note: We always use a default reader.
547 MOZ_ASSERT(!mReader->IsBYOB());
549 // Step 3. Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader).
550 ReadableStreamDefaultReaderRelease(aCx, mReader, rv);
551 NS_WARNING_ASSERTION(!rv.Failed(),
552 "ReadableStreamReaderGenericRelease should not fail.");
554 // Step 3. If signal is not undefined, remove abortAlgorithm from signal.
555 if (IsFollowing()) {
556 Unfollow();
559 // Step 4. If error was given, reject promise with error.
560 if (aError.isSome()) {
561 JS::Rooted<JS::Value> error(aCx, *aError);
562 mPromise->MaybeReject(error);
563 } else {
564 // Step 5. Otherwise, resolve promise with undefined.
565 mPromise->MaybeResolveWithUndefined();
568 // Remove all references.
569 mPromise = nullptr;
570 mReader = nullptr;
571 mWriter = nullptr;
572 mLastWritePromise = nullptr;
573 Unfollow();
576 void PipeToPump::OnReadFulfilled(JSContext* aCx, JS::Handle<JS::Value> aChunk,
577 ErrorResult& aRv) {
578 // (Constraint) Shutdown must stop activity:
579 // if shuttingDown becomes true, the user agent must not initiate further
580 // reads from reader, and must only perform writes of already-read chunks ...
582 // We may reach this point after |On{Source,Dest}{Clos,Error}ed| has responded
583 // to an out-of-band change. Per the comment in |OnSourceErrored|, we want to
584 // allow the implicated shutdown to proceed, and we don't want to interfere
585 // with or additionally alter its operation. Particularly, we don't want to
586 // queue up the successfully-read chunk (if there was one, and this isn't just
587 // reporting "done") to be written: it wasn't "already-read" when that
588 // error/closure happened.
590 // All specified reactions to a closure/error invoke either the shutdown, or
591 // shutdown with an action, algorithms. Those algorithms each abort if either
592 // shutdown algorithm has already been invoked. So we check for shutdown here
593 // in case of asynchronous closure/error and abort if shutdown has already
594 // started (and possibly finished).
596 // TODO: Implement the eventual resolution from
597 // https://github.com/whatwg/streams/issues/1207
598 if (mShuttingDown) {
599 return;
602 RefPtr<WritableStreamDefaultWriter> writer = mWriter;
603 mLastWritePromise =
604 WritableStreamDefaultWriterWrite(aCx, writer, aChunk, aRv);
605 if (aRv.Failed()) {
606 mLastWritePromise = nullptr;
607 return;
610 mLastWritePromise->AppendNativeHandler(
611 new PipeToPumpHandler(this, nullptr, &PipeToPump::OnDestErrored));
613 // Last read has finished, so it's time to start reading again.
614 Read(aCx);
617 void PipeToPump::OnWriterReady(JSContext* aCx, JS::Handle<JS::Value>) {
618 // Writer is ready again (i.e. backpressure was resolved), so read.
619 Read(aCx);
622 struct PipeToReadRequest : public ReadRequest {
623 public:
624 NS_DECL_ISUPPORTS_INHERITED
625 NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(PipeToReadRequest, ReadRequest)
627 RefPtr<PipeToPump> mPipeToPump;
629 explicit PipeToReadRequest(PipeToPump* aPipeToPump)
630 : mPipeToPump(aPipeToPump) {}
632 MOZ_CAN_RUN_SCRIPT_BOUNDARY void ChunkSteps(JSContext* aCx,
633 JS::Handle<JS::Value> aChunk,
634 ErrorResult& aRv) override {
635 RefPtr<PipeToPump> pipeToPump = mPipeToPump; // XXX known live?
636 pipeToPump->OnReadFulfilled(aCx, aChunk, aRv);
639 // The reader's closed promise handlers will already call OnSourceClosed/
640 // OnSourceErrored, so these steps can just be ignored.
641 void CloseSteps(JSContext* aCx, ErrorResult& aRv) override {}
642 void ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError,
643 ErrorResult& aRv) override {}
645 protected:
646 virtual ~PipeToReadRequest() = default;
649 NS_IMPL_CYCLE_COLLECTION_INHERITED(PipeToReadRequest, ReadRequest, mPipeToPump)
651 NS_IMPL_ADDREF_INHERITED(PipeToReadRequest, ReadRequest)
652 NS_IMPL_RELEASE_INHERITED(PipeToReadRequest, ReadRequest)
654 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PipeToReadRequest)
655 NS_INTERFACE_MAP_END_INHERITING(ReadRequest)
657 void PipeToPump::Read(JSContext* aCx) {
658 #ifdef DEBUG
659 mReadChunk = true;
660 #endif
662 // (Constraint) Shutdown must stop activity:
663 // If shuttingDown becomes true, the user agent must not initiate
664 // further reads from reader
665 if (mShuttingDown) {
666 return;
669 // (Constraint) Backpressure must be enforced:
670 // While WritableStreamDefaultWriterGetDesiredSize(writer) is ≤ 0 or is null,
671 // the user agent must not read from reader.
672 Nullable<double> desiredSize =
673 WritableStreamDefaultWriterGetDesiredSize(mWriter);
674 if (desiredSize.IsNull()) {
675 // This means the writer has errored. This is going to be handled
676 // by the writer closed promise.
677 return;
680 if (desiredSize.Value() <= 0) {
681 // Wait for the writer to become ready before reading more data from
682 // the reader. We don't care about rejections here, because those are
683 // already handled by the writer closed promise.
684 RefPtr<Promise> readyPromise = mWriter->Ready();
685 readyPromise->AppendNativeHandler(
686 new PipeToPumpHandler(this, &PipeToPump::OnWriterReady, nullptr));
687 return;
690 RefPtr<ReadableStreamDefaultReader> reader = mReader;
691 RefPtr<ReadRequest> request = new PipeToReadRequest(this);
692 ErrorResult rv;
693 ReadableStreamDefaultReaderRead(aCx, reader, request, rv);
694 if (rv.MaybeSetPendingException(aCx)) {
695 // XXX It's actually not quite obvious what we should do here.
696 // We've got an error during reading, so on the surface it seems logical
697 // to invoke `OnSourceErrored`. However in certain cases the required
698 // condition > source.[[state]] is or becomes "errored" < won't actually
699 // happen i.e. when `WritableStreamDefaultWriterWrite` called from
700 // `OnReadFulfilled` (via PipeToReadRequest::ChunkSteps) fails in
701 // a synchronous fashion.
702 JS::Rooted<JS::Value> error(aCx);
703 JS::Rooted<Maybe<JS::Value>> someError(aCx);
705 // The error was moved to the JSContext by MaybeSetPendingException.
706 if (JS_GetPendingException(aCx, &error)) {
707 someError = Some(error.get());
710 JS_ClearPendingException(aCx);
712 Shutdown(aCx, someError);
716 // Step 3. Closing must be propagated forward: if source.[[state]] is or
717 // becomes "closed", then
718 void PipeToPump::OnSourceClosed(JSContext* aCx, JS::Handle<JS::Value>) {
719 // Step 3.1. If preventClose is false, shutdown with an action of
720 // ! WritableStreamDefaultWriterCloseWithErrorPropagation(writer).
721 if (!mPreventClose) {
722 ShutdownWithAction(
723 aCx,
724 [](JSContext* aCx, PipeToPump* aPipeToPump,
725 JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv)
726 MOZ_CAN_RUN_SCRIPT {
727 RefPtr<WritableStreamDefaultWriter> writer = aPipeToPump->mWriter;
728 return WritableStreamDefaultWriterCloseWithErrorPropagation(
729 aCx, writer, aRv);
731 JS::NothingHandleValue);
732 } else {
733 // Step 3.2 Otherwise, shutdown.
734 Shutdown(aCx, JS::NothingHandleValue);
738 // Step 1. Errors must be propagated forward: if source.[[state]] is or
739 // becomes "errored", then
740 void PipeToPump::OnSourceErrored(JSContext* aCx,
741 JS::Handle<JS::Value> aSourceStoredError) {
742 // If |source| becomes errored not during a pending read, it's clear we must
743 // react immediately.
745 // But what if |source| becomes errored *during* a pending read? Should this
746 // first error, or the pending-read second error, predominate? Two semantics
747 // are possible when |source|/|dest| become closed or errored while there's a
748 // pending read:
750 // 1. Wait until the read fulfills or rejects, then respond to the
751 // closure/error without regard to the read having fulfilled or rejected.
752 // (This will simply not react to the read being rejected, or it will
753 // queue up the read chunk to be written during shutdown.)
754 // 2. React to the closure/error immediately per "Error and close states
755 // must be propagated". Then when the read fulfills or rejects later, do
756 // nothing.
758 // The spec doesn't clearly require either semantics. It requires that
759 // *already-read* chunks be written (at least if |dest| didn't become errored
760 // or closed such that no further writes can occur). But it's silent as to
761 // not-fully-read chunks. (These semantic differences may only be observable
762 // with very carefully constructed readable/writable streams.)
764 // It seems best, generally, to react to the temporally-earliest problem that
765 // arises, so we implement option #2. (Blink, in contrast, currently
766 // implements option #1.)
768 // All specified reactions to a closure/error invoke either the shutdown, or
769 // shutdown with an action, algorithms. Those algorithms each abort if either
770 // shutdown algorithm has already been invoked. So we don't need to do
771 // anything special here to deal with a pending read.
773 // TODO: Implement the eventual resolution from
774 // https://github.com/whatwg/streams/issues/1207
776 // Step 1.1 If preventAbort is false, shutdown with an action of
777 // ! WritableStreamAbort(dest, source.[[storedError]])
778 // and with source.[[storedError]].
779 JS::Rooted<Maybe<JS::Value>> error(aCx, Some(aSourceStoredError));
780 if (!mPreventAbort) {
781 ShutdownWithAction(
782 aCx,
783 [](JSContext* aCx, PipeToPump* aPipeToPump,
784 JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv)
785 MOZ_CAN_RUN_SCRIPT {
786 JS::Rooted<JS::Value> error(aCx, *aError);
787 RefPtr<WritableStream> dest = aPipeToPump->mWriter->GetStream();
788 return WritableStreamAbort(aCx, dest, error, aRv);
790 error);
791 } else {
792 // Step 1.1. Otherwise, shutdown with source.[[storedError]].
793 Shutdown(aCx, error);
797 // Step 4. Closing must be propagated backward:
798 // if ! WritableStreamCloseQueuedOrInFlight(dest) is true
799 // or dest.[[state]] is "closed", then
800 void PipeToPump::OnDestClosed(JSContext* aCx, JS::Handle<JS::Value>) {
801 // Step 4.1. Assert: no chunks have been read or written.
802 // Note: No reading automatically implies no writing.
803 // In a perfect world OnDestClosed would only be called before we start
804 // piping, because afterwards the writer has an exclusive lock on the stream.
805 // In reality the closed promise can still be resolved after we release
806 // the lock on the writer in Finalize.
807 if (mShuttingDown) {
808 return;
810 MOZ_ASSERT(!mReadChunk);
812 // Step 4.2. Let destClosed be a new TypeError.
813 JS::Rooted<Maybe<JS::Value>> destClosed(aCx, Nothing());
815 ErrorResult rv;
816 rv.ThrowTypeError("Cannot pipe to closed stream");
817 JS::Rooted<JS::Value> error(aCx);
818 bool ok = ToJSValue(aCx, std::move(rv), &error);
819 MOZ_RELEASE_ASSERT(ok, "must be ok");
820 destClosed = Some(error.get());
823 // Step 4.3. If preventCancel is false, shutdown with an action of
824 // ! ReadableStreamCancel(source, destClosed) and with destClosed.
825 if (!mPreventCancel) {
826 ShutdownWithAction(
827 aCx,
828 [](JSContext* aCx, PipeToPump* aPipeToPump,
829 JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv)
830 MOZ_CAN_RUN_SCRIPT {
831 JS::Rooted<JS::Value> error(aCx, *aError);
832 RefPtr<ReadableStream> dest = aPipeToPump->mReader->GetStream();
833 return ReadableStreamCancel(aCx, dest, error, aRv);
835 destClosed);
836 } else {
837 // Step 4.4. Otherwise, shutdown with destClosed.
838 Shutdown(aCx, destClosed);
842 // Step 2. Errors must be propagated backward: if dest.[[state]] is or becomes
843 // "errored", then
844 void PipeToPump::OnDestErrored(JSContext* aCx,
845 JS::Handle<JS::Value> aDestStoredError) {
846 // Step 2.1. If preventCancel is false, shutdown with an action of
847 // ! ReadableStreamCancel(source, dest.[[storedError]])
848 // and with dest.[[storedError]].
849 JS::Rooted<Maybe<JS::Value>> error(aCx, Some(aDestStoredError));
850 if (!mPreventCancel) {
851 ShutdownWithAction(
852 aCx,
853 [](JSContext* aCx, PipeToPump* aPipeToPump,
854 JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv)
855 MOZ_CAN_RUN_SCRIPT {
856 JS::Rooted<JS::Value> error(aCx, *aError);
857 RefPtr<ReadableStream> dest = aPipeToPump->mReader->GetStream();
858 return ReadableStreamCancel(aCx, dest, error, aRv);
860 error);
861 } else {
862 // Step 2.1. Otherwise, shutdown with dest.[[storedError]].
863 Shutdown(aCx, error);
867 NS_IMPL_CYCLE_COLLECTION_CLASS(PipeToPump)
868 NS_IMPL_CYCLE_COLLECTING_ADDREF(PipeToPump)
869 NS_IMPL_CYCLE_COLLECTING_RELEASE(PipeToPump)
870 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PipeToPump)
871 NS_INTERFACE_MAP_ENTRY(nsISupports)
872 NS_INTERFACE_MAP_END
874 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(PipeToPump)
875 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mPromise)
876 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReader)
877 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mWriter)
878 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mLastWritePromise)
879 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
881 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(PipeToPump)
882 NS_IMPL_CYCLE_COLLECTION_UNLINK(mPromise)
883 NS_IMPL_CYCLE_COLLECTION_UNLINK(mReader)
884 NS_IMPL_CYCLE_COLLECTION_UNLINK(mWriter)
885 NS_IMPL_CYCLE_COLLECTION_UNLINK(mLastWritePromise)
886 NS_IMPL_CYCLE_COLLECTION_UNLINK_END
888 // https://streams.spec.whatwg.org/#readable-stream-pipe-to
889 already_AddRefed<Promise> ReadableStreamPipeTo(
890 ReadableStream* aSource, WritableStream* aDest, bool aPreventClose,
891 bool aPreventAbort, bool aPreventCancel, AbortSignal* aSignal,
892 mozilla::ErrorResult& aRv) {
893 // Step 1. Assert: source implements ReadableStream. (Implicit)
894 // Step 2. Assert: dest implements WritableStream. (Implicit)
895 // Step 3. Assert: preventClose, preventAbort, and preventCancel are all
896 // booleans (Implicit)
897 // Step 4. If signal was not given, let signal be
898 // undefined. (Implicit)
899 // Step 5. Assert: either signal is undefined, or signal
900 // implements AbortSignal. (Implicit)
901 // Step 6. Assert: !IsReadableStreamLocked(source) is false.
902 MOZ_ASSERT(!IsReadableStreamLocked(aSource));
904 // Step 7. Assert: !IsWritableStreamLocked(dest) is false.
905 MOZ_ASSERT(!IsWritableStreamLocked(aDest));
907 AutoJSAPI jsapi;
908 if (!jsapi.Init(aSource->GetParentObject())) {
909 aRv.ThrowUnknownError("Internal error");
910 return nullptr;
912 JSContext* cx = jsapi.cx();
914 // Step 8. If source.[[controller]] implements ReadableByteStreamController,
915 // let reader be either !AcquireReadableStreamBYOBReader(source) or
916 // !AcquireReadableStreamDefaultReader(source), at the user agent’s
917 // discretion.
918 // Step 9. Otherwise, let reader be
919 // !AcquireReadableStreamDefaultReader(source).
921 // Note: In the interests of simplicity, we choose here to always acquire
922 // a default reader.
923 RefPtr<ReadableStreamDefaultReader> reader =
924 AcquireReadableStreamDefaultReader(aSource, aRv);
925 if (aRv.Failed()) {
926 return nullptr;
929 // Step 10. Let writer be ! AcquireWritableStreamDefaultWriter(dest).
930 RefPtr<WritableStreamDefaultWriter> writer =
931 AcquireWritableStreamDefaultWriter(aDest, aRv);
932 if (aRv.Failed()) {
933 return nullptr;
936 // Step 11. Set source.[[disturbed]] to true.
937 aSource->SetDisturbed(true);
939 // Step 12. Let shuttingDown be false.
940 // Note: PipeToPump ensures this by construction.
942 // Step 13. Let promise be a new promise.
943 RefPtr<Promise> promise = Promise::Create(aSource->GetParentObject(), aRv);
944 if (aRv.Failed()) {
945 return nullptr;
948 // Steps 14-15.
949 RefPtr<PipeToPump> pump = new PipeToPump(
950 promise, reader, writer, aPreventClose, aPreventAbort, aPreventCancel);
951 pump->Start(cx, aSignal);
953 // Step 16. Return promise.
954 return promise.forget();
957 } // namespace mozilla::dom