1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "FetchStreamReader.h"
8 #include "InternalResponse.h"
9 #include "mozilla/ConsoleReportCollector.h"
10 #include "mozilla/ErrorResult.h"
11 #include "mozilla/StaticAnalysisFunctions.h"
12 #include "mozilla/dom/AutoEntryScript.h"
13 #include "mozilla/dom/Promise.h"
14 #include "mozilla/dom/PromiseBinding.h"
15 #include "mozilla/dom/ReadableStream.h"
16 #include "mozilla/dom/ReadableStreamDefaultController.h"
17 #include "mozilla/dom/ReadableStreamDefaultReader.h"
18 #include "mozilla/dom/WorkerPrivate.h"
19 #include "mozilla/dom/WorkerRef.h"
20 #include "mozilla/HoldDropJSObjects.h"
21 #include "nsContentUtils.h"
23 #include "nsIAsyncInputStream.h"
25 #include "nsIScriptError.h"
26 #include "nsPIDOMWindow.h"
29 namespace mozilla::dom
{
31 NS_IMPL_CYCLE_COLLECTING_ADDREF(FetchStreamReader
)
32 NS_IMPL_CYCLE_COLLECTING_RELEASE(FetchStreamReader
)
34 NS_IMPL_CYCLE_COLLECTION(FetchStreamReader
, mGlobal
, mReader
)
36 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(FetchStreamReader
)
37 NS_INTERFACE_MAP_ENTRY(nsIOutputStreamCallback
)
38 NS_INTERFACE_MAP_ENTRY(nsISupports
)
42 nsresult
FetchStreamReader::Create(JSContext
* aCx
, nsIGlobalObject
* aGlobal
,
43 FetchStreamReader
** aStreamReader
,
44 nsIInputStream
** aInputStream
) {
47 MOZ_ASSERT(aStreamReader
);
48 MOZ_ASSERT(aInputStream
);
50 RefPtr
<FetchStreamReader
> streamReader
= new FetchStreamReader(aGlobal
);
52 nsCOMPtr
<nsIAsyncInputStream
> pipeIn
;
54 NS_NewPipe2(getter_AddRefs(pipeIn
), getter_AddRefs(streamReader
->mPipeOut
),
57 pipeIn
.forget(aInputStream
);
58 streamReader
.forget(aStreamReader
);
62 nsresult
FetchStreamReader::MaybeGrabStrongWorkerRef(JSContext
* aCx
) {
63 if (NS_IsMainThread()) {
67 WorkerPrivate
* workerPrivate
= GetWorkerPrivateFromContext(aCx
);
68 MOZ_ASSERT(workerPrivate
);
70 RefPtr
<StrongWorkerRef
> workerRef
= StrongWorkerRef::Create(
71 workerPrivate
, "FetchStreamReader", [streamReader
= RefPtr(this)]() {
72 MOZ_ASSERT(streamReader
);
74 // mAsyncWaitWorkerRef may keep the (same) StrongWorkerRef alive even
75 // when mWorkerRef has already been nulled out by a previous call to
76 // CloseAndRelease, we can just safely ignore this callback then
77 // (as would the CloseAndRelease do on a second call).
78 if (streamReader
->mWorkerRef
) {
79 streamReader
->CloseAndRelease(
80 streamReader
->mWorkerRef
->Private()->GetJSContext(),
81 NS_ERROR_DOM_INVALID_STATE_ERR
);
83 MOZ_DIAGNOSTIC_ASSERT(streamReader
->mAsyncWaitWorkerRef
);
87 if (NS_WARN_IF(!workerRef
)) {
88 return NS_ERROR_DOM_INVALID_STATE_ERR
;
91 // These 2 objects create a ref-cycle here that is broken when the stream is
92 // closed or the worker shutsdown.
93 mWorkerRef
= std::move(workerRef
);
98 FetchStreamReader::FetchStreamReader(nsIGlobalObject
* aGlobal
)
99 : mGlobal(aGlobal
), mOwningEventTarget(mGlobal
->SerialEventTarget()) {
103 FetchStreamReader::~FetchStreamReader() {
104 CloseAndRelease(nullptr, NS_BASE_STREAM_CLOSED
);
107 // If a context is provided, an attempt will be made to cancel the reader. The
108 // only situation where we don't expect to have a context is when closure is
109 // being triggered from the destructor or the WorkerRef is notifying. If
110 // we're at the destructor, it's far too late to cancel anything. And if the
111 // WorkerRef is being notified, the global is going away, so there's also
112 // no need to do further JS work.
113 void FetchStreamReader::CloseAndRelease(JSContext
* aCx
, nsresult aStatus
) {
114 NS_ASSERT_OWNINGTHREAD(FetchStreamReader
);
121 RefPtr
<FetchStreamReader
> kungFuDeathGrip
= this;
122 if (aCx
&& mReader
) {
124 if (aStatus
== NS_ERROR_DOM_WRONG_TYPE_ERR
) {
125 rv
.ThrowTypeError
<MSG_FETCH_BODY_WRONG_TYPE
>();
129 JS::Rooted
<JS::Value
> errorValue(aCx
);
130 if (ToJSValue(aCx
, std::move(rv
), &errorValue
)) {
131 IgnoredErrorResult ignoredError
;
132 // It's currently safe to cancel an already closed reader because, per the
133 // comments in ReadableStream::cancel() conveying the spec, step 2 of
134 // 3.4.3 that specified ReadableStreamCancel is: If stream.[[state]] is
135 // "closed", return a new promise resolved with undefined.
136 RefPtr
<Promise
> cancelResultPromise
=
137 MOZ_KnownLive(mReader
)->Cancel(aCx
, errorValue
, ignoredError
);
138 NS_WARNING_ASSERTION(!ignoredError
.Failed(),
139 "Failed to cancel stream during close and release");
140 if (cancelResultPromise
) {
141 bool setHandled
= cancelResultPromise
->SetAnyPromiseIsHandled();
142 NS_WARNING_ASSERTION(setHandled
,
143 "Failed to mark cancel promise as handled.");
148 // We don't want to propagate exceptions during the cleanup.
149 JS_ClearPendingException(aCx
);
152 mStreamClosed
= true;
157 mPipeOut
->CloseWithStatus(aStatus
);
161 mWorkerRef
= nullptr;
167 // https://fetch.spec.whatwg.org/#body-incrementally-read
168 void FetchStreamReader::StartConsuming(JSContext
* aCx
, ReadableStream
* aStream
,
170 MOZ_DIAGNOSTIC_ASSERT(!mReader
);
171 MOZ_DIAGNOSTIC_ASSERT(aStream
);
172 MOZ_ASSERT(!aStream
->MaybeGetInputStreamIfUnread(),
173 "FetchStreamReader is for JS streams but we got a stream based on "
174 "nsIInputStream here. Extract nsIInputStream and read it instead "
175 "to reduce overhead.");
177 aRv
= MaybeGrabStrongWorkerRef(aCx
);
179 CloseAndRelease(aCx
, NS_ERROR_DOM_INVALID_STATE_ERR
);
183 // Step 2: Let reader be the result of getting a reader for body’s stream.
184 RefPtr
<ReadableStreamDefaultReader
> reader
= aStream
->GetReader(aRv
);
186 CloseAndRelease(aCx
, NS_ERROR_DOM_INVALID_STATE_ERR
);
192 mAsyncWaitWorkerRef
= mWorkerRef
;
193 aRv
= mPipeOut
->AsyncWait(this, 0, 0, mOwningEventTarget
);
194 if (NS_WARN_IF(aRv
.Failed())) {
195 mAsyncWaitWorkerRef
= nullptr;
196 CloseAndRelease(aCx
, NS_ERROR_DOM_INVALID_STATE_ERR
);
200 struct FetchReadRequest
: public ReadRequest
{
202 NS_DECL_ISUPPORTS_INHERITED
203 NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(FetchReadRequest
, ReadRequest
)
205 explicit FetchReadRequest(FetchStreamReader
* aReader
)
206 : mFetchStreamReader(aReader
) {}
208 MOZ_CAN_RUN_SCRIPT_BOUNDARY
209 void ChunkSteps(JSContext
* aCx
, JS::Handle
<JS::Value
> aChunk
,
210 ErrorResult
& aRv
) override
{
211 mFetchStreamReader
->ChunkSteps(aCx
, aChunk
, aRv
);
214 MOZ_CAN_RUN_SCRIPT_BOUNDARY
215 void CloseSteps(JSContext
* aCx
, ErrorResult
& aRv
) override
{
216 mFetchStreamReader
->CloseSteps(aCx
, aRv
);
219 MOZ_CAN_RUN_SCRIPT_BOUNDARY
220 void ErrorSteps(JSContext
* aCx
, JS::Handle
<JS::Value
> aError
,
221 ErrorResult
& aRv
) override
{
222 mFetchStreamReader
->ErrorSteps(aCx
, aError
, aRv
);
226 virtual ~FetchReadRequest() = default;
228 MOZ_KNOWN_LIVE RefPtr
<FetchStreamReader
> mFetchStreamReader
;
231 NS_IMPL_CYCLE_COLLECTION_INHERITED(FetchReadRequest
, ReadRequest
,
233 NS_IMPL_ADDREF_INHERITED(FetchReadRequest
, ReadRequest
)
234 NS_IMPL_RELEASE_INHERITED(FetchReadRequest
, ReadRequest
)
235 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(FetchReadRequest
)
236 NS_INTERFACE_MAP_END_INHERITING(ReadRequest
)
238 // nsIOutputStreamCallback interface
239 MOZ_CAN_RUN_SCRIPT_BOUNDARY
241 FetchStreamReader::OnOutputStreamReady(nsIAsyncOutputStream
* aStream
) {
242 NS_ASSERT_OWNINGTHREAD(FetchStreamReader
);
244 mAsyncWaitWorkerRef
= nullptr;
248 AutoEntryScript
aes(mGlobal
, "ReadableStreamReader.read", !mWorkerRef
);
249 if (!Process(aes
.cx())) {
250 // We're done processing data, and haven't queued up a new AsyncWait - we
251 // can clear our mAsyncWaitWorkerRef.
252 mAsyncWaitWorkerRef
= nullptr;
257 bool FetchStreamReader::Process(JSContext
* aCx
) {
258 NS_ASSERT_OWNINGTHREAD(FetchStreamReader
);
261 if (!mBuffer
.IsEmpty()) {
262 nsresult rv
= WriteBuffer();
263 if (NS_WARN_IF(NS_FAILED(rv
))) {
264 CloseAndRelease(aCx
, NS_ERROR_DOM_ABORT_ERR
);
270 // Check if the output stream has already been closed. This lets us propagate
271 // errors eagerly, and detect output stream closures even when we have no data
273 if (NS_WARN_IF(NS_FAILED(mPipeOut
->StreamStatus()))) {
274 CloseAndRelease(aCx
, NS_ERROR_DOM_ABORT_ERR
);
278 // We're waiting on new data - set up a WAIT_CLOSURE_ONLY callback so we
279 // notice if the reader closes.
280 nsresult rv
= mPipeOut
->AsyncWait(
281 this, nsIAsyncOutputStream::WAIT_CLOSURE_ONLY
, 0, mOwningEventTarget
);
282 if (NS_WARN_IF(NS_FAILED(rv
))) {
283 CloseAndRelease(aCx
, NS_ERROR_DOM_INVALID_STATE_ERR
);
287 // If we already have an outstanding read request, don't start another one
289 if (!mHasOutstandingReadRequest
) {
290 // https://fetch.spec.whatwg.org/#incrementally-read-loop
291 // The below very loosely tries to implement the incrementally-read-loop
292 // from the fetch spec.
293 // Step 2: Read a chunk from reader given readRequest.
294 RefPtr
<ReadRequest
> readRequest
= new FetchReadRequest(this);
295 RefPtr
<ReadableStreamDefaultReader
> reader
= mReader
;
296 mHasOutstandingReadRequest
= true;
298 IgnoredErrorResult err
;
299 reader
->ReadChunk(aCx
, *readRequest
, err
);
300 if (NS_WARN_IF(err
.Failed())) {
301 // Let's close the stream.
302 mHasOutstandingReadRequest
= false;
303 CloseAndRelease(aCx
, NS_ERROR_DOM_INVALID_STATE_ERR
);
304 // Don't return false, as we've already called `AsyncWait`.
310 void FetchStreamReader::ChunkSteps(JSContext
* aCx
, JS::Handle
<JS::Value
> aChunk
,
312 // This roughly implements the chunk steps from
313 // https://fetch.spec.whatwg.org/#incrementally-read-loop.
315 mHasOutstandingReadRequest
= false;
317 // Step 2. If chunk is not a Uint8Array object, then set continueAlgorithm to
318 // this step: run processBodyError given a TypeError.
319 RootedSpiderMonkeyInterface
<Uint8Array
> chunk(aCx
);
320 if (!aChunk
.isObject() || !chunk
.Init(&aChunk
.toObject())) {
321 CloseAndRelease(aCx
, NS_ERROR_DOM_WRONG_TYPE_ERR
);
325 MOZ_DIAGNOSTIC_ASSERT(mBuffer
.IsEmpty());
327 // Let's take a copy of the data.
328 // FIXME: We could sometimes avoid this copy by trying to write `chunk`
329 // directly into `mPipeOut` eagerly, and only filling `mBuffer` if there isn't
330 // enough space in the pipe's buffer.
331 if (!chunk
.AppendDataTo(mBuffer
)) {
332 CloseAndRelease(aCx
, NS_ERROR_OUT_OF_MEMORY
);
337 mBufferRemaining
= mBuffer
.Length();
339 nsresult rv
= WriteBuffer();
340 if (NS_WARN_IF(NS_FAILED(rv
))) {
341 CloseAndRelease(aCx
, NS_ERROR_DOM_ABORT_ERR
);
345 void FetchStreamReader::CloseSteps(JSContext
* aCx
, ErrorResult
& aRv
) {
346 mHasOutstandingReadRequest
= false;
347 CloseAndRelease(aCx
, NS_BASE_STREAM_CLOSED
);
350 void FetchStreamReader::ErrorSteps(JSContext
* aCx
, JS::Handle
<JS::Value
> aError
,
352 mHasOutstandingReadRequest
= false;
353 ReportErrorToConsole(aCx
, aError
);
354 CloseAndRelease(aCx
, NS_ERROR_FAILURE
);
357 nsresult
FetchStreamReader::WriteBuffer() {
358 MOZ_ASSERT(mBuffer
.Length() == (mBufferOffset
+ mBufferRemaining
));
360 char* data
= reinterpret_cast<char*>(mBuffer
.Elements());
362 while (mBufferRemaining
> 0) {
363 uint32_t written
= 0;
365 mPipeOut
->Write(data
+ mBufferOffset
, mBufferRemaining
, &written
);
367 if (rv
== NS_BASE_STREAM_WOULD_BLOCK
) {
371 if (NS_WARN_IF(NS_FAILED(rv
))) {
375 MOZ_ASSERT(written
<= mBufferRemaining
);
376 mBufferRemaining
-= written
;
377 mBufferOffset
+= written
;
379 if (mBufferRemaining
== 0) {
385 nsresult rv
= mPipeOut
->AsyncWait(this, 0, 0, mOwningEventTarget
);
386 if (NS_WARN_IF(NS_FAILED(rv
))) {
393 void FetchStreamReader::ReportErrorToConsole(JSContext
* aCx
,
394 JS::Handle
<JS::Value
> aValue
) {
395 nsCString sourceSpec
;
398 nsString valueString
;
400 nsContentUtils::ExtractErrorValues(aCx
, aValue
, sourceSpec
, &line
, &column
,
403 nsTArray
<nsString
> params
;
404 params
.AppendElement(valueString
);
406 RefPtr
<ConsoleReportCollector
> reporter
= new ConsoleReportCollector();
407 reporter
->AddConsoleReport(nsIScriptError::errorFlag
,
408 "ReadableStreamReader.read"_ns
,
409 nsContentUtils::eDOM_PROPERTIES
, sourceSpec
, line
,
410 column
, "ReadableStreamReadingFailed"_ns
, params
);
412 uint64_t innerWindowId
= 0;
414 if (NS_IsMainThread()) {
415 nsCOMPtr
<nsPIDOMWindowInner
> window
= do_QueryInterface(mGlobal
);
417 innerWindowId
= window
->WindowID();
419 reporter
->FlushReportsToConsole(innerWindowId
);
423 WorkerPrivate
* workerPrivate
= GetWorkerPrivateFromContext(aCx
);
425 innerWindowId
= workerPrivate
->WindowID();
428 RefPtr
<Runnable
> r
= NS_NewRunnableFunction(
429 "FetchStreamReader::ReportErrorToConsole", [reporter
, innerWindowId
]() {
430 reporter
->FlushReportsToConsole(innerWindowId
);
433 workerPrivate
->DispatchToMainThread(r
.forget());
436 } // namespace mozilla::dom