1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "FetchStreamReader.h"
8 #include "InternalResponse.h"
9 #include "mozilla/ConsoleReportCollector.h"
10 #include "mozilla/ErrorResult.h"
11 #include "mozilla/StaticAnalysisFunctions.h"
12 #include "mozilla/dom/AutoEntryScript.h"
13 #include "mozilla/dom/Promise.h"
14 #include "mozilla/dom/PromiseBinding.h"
15 #include "mozilla/dom/ReadableStream.h"
16 #include "mozilla/dom/ReadableStreamDefaultController.h"
17 #include "mozilla/dom/ReadableStreamDefaultReader.h"
18 #include "mozilla/dom/WorkerPrivate.h"
19 #include "mozilla/dom/WorkerRef.h"
20 #include "mozilla/HoldDropJSObjects.h"
21 #include "mozilla/TaskCategory.h"
22 #include "nsContentUtils.h"
24 #include "nsIAsyncInputStream.h"
26 #include "nsIScriptError.h"
27 #include "nsPIDOMWindow.h"
30 namespace mozilla::dom
{
32 NS_IMPL_CYCLE_COLLECTING_ADDREF(FetchStreamReader
)
33 NS_IMPL_CYCLE_COLLECTING_RELEASE(FetchStreamReader
)
35 NS_IMPL_CYCLE_COLLECTION_CLASS(FetchStreamReader
)
37 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(FetchStreamReader
)
38 NS_IMPL_CYCLE_COLLECTION_UNLINK(mGlobal
)
39 NS_IMPL_CYCLE_COLLECTION_UNLINK(mReader
)
40 NS_IMPL_CYCLE_COLLECTION_UNLINK_END
42 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(FetchStreamReader
)
43 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mGlobal
)
44 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReader
)
45 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
47 NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(FetchStreamReader
)
48 NS_IMPL_CYCLE_COLLECTION_TRACE_END
50 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(FetchStreamReader
)
51 NS_INTERFACE_MAP_ENTRY(nsIOutputStreamCallback
)
52 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports
, nsIOutputStreamCallback
)
56 nsresult
FetchStreamReader::Create(JSContext
* aCx
, nsIGlobalObject
* aGlobal
,
57 FetchStreamReader
** aStreamReader
,
58 nsIInputStream
** aInputStream
) {
61 MOZ_ASSERT(aStreamReader
);
62 MOZ_ASSERT(aInputStream
);
64 RefPtr
<FetchStreamReader
> streamReader
= new FetchStreamReader(aGlobal
);
66 nsCOMPtr
<nsIAsyncInputStream
> pipeIn
;
68 NS_NewPipe2(getter_AddRefs(pipeIn
), getter_AddRefs(streamReader
->mPipeOut
),
71 if (!NS_IsMainThread()) {
72 WorkerPrivate
* workerPrivate
= GetWorkerPrivateFromContext(aCx
);
73 MOZ_ASSERT(workerPrivate
);
75 RefPtr
<StrongWorkerRef
> workerRef
= StrongWorkerRef::Create(
76 workerPrivate
, "FetchStreamReader", [streamReader
]() {
77 MOZ_ASSERT(streamReader
);
79 // mAsyncWaitWorkerRef may keep the (same) StrongWorkerRef alive even
80 // when mWorkerRef has already been nulled out by a previous call to
81 // CloseAndRelease, we can just safely ignore this callback then
82 // (as would the CloseAndRelease do on a second call).
83 if (streamReader
->mWorkerRef
) {
84 streamReader
->CloseAndRelease(
85 streamReader
->mWorkerRef
->Private()->GetJSContext(),
86 NS_ERROR_DOM_INVALID_STATE_ERR
);
88 MOZ_DIAGNOSTIC_ASSERT(streamReader
->mAsyncWaitWorkerRef
);
92 if (NS_WARN_IF(!workerRef
)) {
93 streamReader
->mPipeOut
->CloseWithStatus(NS_ERROR_DOM_INVALID_STATE_ERR
);
94 return NS_ERROR_DOM_INVALID_STATE_ERR
;
97 // These 2 objects create a ref-cycle here that is broken when the stream is
98 // closed or the worker shutsdown.
99 streamReader
->mWorkerRef
= std::move(workerRef
);
102 pipeIn
.forget(aInputStream
);
103 streamReader
.forget(aStreamReader
);
107 FetchStreamReader::FetchStreamReader(nsIGlobalObject
* aGlobal
)
109 mOwningEventTarget(mGlobal
->EventTargetFor(TaskCategory::Other
)) {
112 mozilla::HoldJSObjects(this);
115 FetchStreamReader::~FetchStreamReader() {
116 CloseAndRelease(nullptr, NS_BASE_STREAM_CLOSED
);
118 mozilla::DropJSObjects(this);
121 // If a context is provided, an attempt will be made to cancel the reader. The
122 // only situation where we don't expect to have a context is when closure is
123 // being triggered from the destructor or the WorkerRef is notifying. If
124 // we're at the destructor, it's far too late to cancel anything. And if the
125 // WorkerRef is being notified, the global is going away, so there's also
126 // no need to do further JS work.
127 void FetchStreamReader::CloseAndRelease(JSContext
* aCx
, nsresult aStatus
) {
128 NS_ASSERT_OWNINGTHREAD(FetchStreamReader
);
135 RefPtr
<FetchStreamReader
> kungFuDeathGrip
= this;
136 if (aCx
&& mReader
) {
138 if (aStatus
== NS_ERROR_DOM_WRONG_TYPE_ERR
) {
139 rv
.ThrowTypeError
<MSG_FETCH_BODY_WRONG_TYPE
>();
143 JS::Rooted
<JS::Value
> errorValue(aCx
);
144 if (ToJSValue(aCx
, std::move(rv
), &errorValue
)) {
145 IgnoredErrorResult ignoredError
;
146 // It's currently safe to cancel an already closed reader because, per the
147 // comments in ReadableStream::cancel() conveying the spec, step 2 of
148 // 3.4.3 that specified ReadableStreamCancel is: If stream.[[state]] is
149 // "closed", return a new promise resolved with undefined.
150 RefPtr
<Promise
> cancelResultPromise
=
151 MOZ_KnownLive(mReader
)->Cancel(aCx
, errorValue
, ignoredError
);
152 NS_WARNING_ASSERTION(!ignoredError
.Failed(),
153 "Failed to cancel stream during close and release");
154 if (cancelResultPromise
) {
155 bool setHandled
= cancelResultPromise
->SetAnyPromiseIsHandled();
156 NS_WARNING_ASSERTION(setHandled
,
157 "Failed to mark cancel promise as handled.");
162 // We don't want to propagate exceptions during the cleanup.
163 JS_ClearPendingException(aCx
);
166 mStreamClosed
= true;
171 mPipeOut
->CloseWithStatus(aStatus
);
175 mWorkerRef
= nullptr;
181 // https://fetch.spec.whatwg.org/#body-incrementally-read
182 void FetchStreamReader::StartConsuming(JSContext
* aCx
, ReadableStream
* aStream
,
184 MOZ_DIAGNOSTIC_ASSERT(!mReader
);
185 MOZ_DIAGNOSTIC_ASSERT(aStream
);
187 // Step 2: Let reader be the result of getting a reader for body’s stream.
188 RefPtr
<ReadableStreamDefaultReader
> reader
= aStream
->GetReader(aRv
);
190 CloseAndRelease(aCx
, NS_ERROR_DOM_INVALID_STATE_ERR
);
196 mAsyncWaitWorkerRef
= mWorkerRef
;
197 aRv
= mPipeOut
->AsyncWait(this, 0, 0, mOwningEventTarget
);
198 if (NS_WARN_IF(aRv
.Failed())) {
199 mAsyncWaitWorkerRef
= nullptr;
200 CloseAndRelease(aCx
, NS_ERROR_DOM_INVALID_STATE_ERR
);
204 struct FetchReadRequest
: public ReadRequest
{
206 NS_DECL_ISUPPORTS_INHERITED
207 NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(FetchReadRequest
, ReadRequest
)
209 explicit FetchReadRequest(FetchStreamReader
* aReader
)
210 : mFetchStreamReader(aReader
) {}
212 MOZ_CAN_RUN_SCRIPT_BOUNDARY
213 void ChunkSteps(JSContext
* aCx
, JS::Handle
<JS::Value
> aChunk
,
214 ErrorResult
& aRv
) override
{
215 mFetchStreamReader
->ChunkSteps(aCx
, aChunk
, aRv
);
218 MOZ_CAN_RUN_SCRIPT_BOUNDARY
219 void CloseSteps(JSContext
* aCx
, ErrorResult
& aRv
) override
{
220 mFetchStreamReader
->CloseSteps(aCx
, aRv
);
223 MOZ_CAN_RUN_SCRIPT_BOUNDARY
224 void ErrorSteps(JSContext
* aCx
, JS::Handle
<JS::Value
> aError
,
225 ErrorResult
& aRv
) override
{
226 mFetchStreamReader
->ErrorSteps(aCx
, aError
, aRv
);
230 virtual ~FetchReadRequest() = default;
232 MOZ_KNOWN_LIVE RefPtr
<FetchStreamReader
> mFetchStreamReader
;
235 NS_IMPL_CYCLE_COLLECTION_INHERITED(FetchReadRequest
, ReadRequest
,
237 NS_IMPL_ADDREF_INHERITED(FetchReadRequest
, ReadRequest
)
238 NS_IMPL_RELEASE_INHERITED(FetchReadRequest
, ReadRequest
)
239 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(FetchReadRequest
)
240 NS_INTERFACE_MAP_END_INHERITING(ReadRequest
)
242 // nsIOutputStreamCallback interface
243 MOZ_CAN_RUN_SCRIPT_BOUNDARY
245 FetchStreamReader::OnOutputStreamReady(nsIAsyncOutputStream
* aStream
) {
246 NS_ASSERT_OWNINGTHREAD(FetchStreamReader
);
248 mAsyncWaitWorkerRef
= nullptr;
252 AutoEntryScript
aes(mGlobal
, "ReadableStreamReader.read", !mWorkerRef
);
253 if (!Process(aes
.cx())) {
254 // We're done processing data, and haven't queued up a new AsyncWait - we
255 // can clear our mAsyncWaitWorkerRef.
256 mAsyncWaitWorkerRef
= nullptr;
261 bool FetchStreamReader::Process(JSContext
* aCx
) {
262 NS_ASSERT_OWNINGTHREAD(FetchStreamReader
);
265 if (!mBuffer
.IsEmpty()) {
266 nsresult rv
= WriteBuffer();
267 if (NS_WARN_IF(NS_FAILED(rv
))) {
268 CloseAndRelease(aCx
, NS_ERROR_DOM_ABORT_ERR
);
274 // Check if the output stream has already been closed. This lets us propagate
275 // errors eagerly, and detect output stream closures even when we have no data
277 if (NS_WARN_IF(NS_FAILED(mPipeOut
->StreamStatus()))) {
278 CloseAndRelease(aCx
, NS_ERROR_DOM_ABORT_ERR
);
282 // We're waiting on new data - set up a WAIT_CLOSURE_ONLY callback so we
283 // notice if the reader closes.
284 nsresult rv
= mPipeOut
->AsyncWait(
285 this, nsIAsyncOutputStream::WAIT_CLOSURE_ONLY
, 0, mOwningEventTarget
);
286 if (NS_WARN_IF(NS_FAILED(rv
))) {
287 CloseAndRelease(aCx
, NS_ERROR_DOM_INVALID_STATE_ERR
);
291 // If we already have an outstanding read request, don't start another one
293 if (!mHasOutstandingReadRequest
) {
294 // https://fetch.spec.whatwg.org/#incrementally-read-loop
295 // The below very loosely tries to implement the incrementally-read-loop
296 // from the fetch spec.
297 // Step 2: Read a chunk from reader given readRequest.
298 RefPtr
<ReadRequest
> readRequest
= new FetchReadRequest(this);
299 RefPtr
<ReadableStreamDefaultReader
> reader
= mReader
;
300 mHasOutstandingReadRequest
= true;
302 IgnoredErrorResult err
;
303 reader
->ReadChunk(aCx
, *readRequest
, err
);
304 if (NS_WARN_IF(err
.Failed())) {
305 // Let's close the stream.
306 mHasOutstandingReadRequest
= false;
307 CloseAndRelease(aCx
, NS_ERROR_DOM_INVALID_STATE_ERR
);
308 // Don't return false, as we've already called `AsyncWait`.
314 void FetchStreamReader::ChunkSteps(JSContext
* aCx
, JS::Handle
<JS::Value
> aChunk
,
316 // This roughly implements the chunk steps from
317 // https://fetch.spec.whatwg.org/#incrementally-read-loop.
319 mHasOutstandingReadRequest
= false;
321 // Step 2. If chunk is not a Uint8Array object, then set continueAlgorithm to
322 // this step: run processBodyError given a TypeError.
323 RootedSpiderMonkeyInterface
<Uint8Array
> chunk(aCx
);
324 if (!aChunk
.isObject() || !chunk
.Init(&aChunk
.toObject())) {
325 CloseAndRelease(aCx
, NS_ERROR_DOM_WRONG_TYPE_ERR
);
328 chunk
.ComputeState();
330 MOZ_DIAGNOSTIC_ASSERT(mBuffer
.IsEmpty());
332 // Let's take a copy of the data.
333 // FIXME: We could sometimes avoid this copy by trying to write `chunk`
334 // directly into `mPipeOut` eagerly, and only filling `mBuffer` if there isn't
335 // enough space in the pipe's buffer.
336 if (!mBuffer
.AppendElements(chunk
.Data(), chunk
.Length(), fallible
)) {
337 CloseAndRelease(aCx
, NS_ERROR_OUT_OF_MEMORY
);
342 mBufferRemaining
= chunk
.Length();
344 nsresult rv
= WriteBuffer();
345 if (NS_WARN_IF(NS_FAILED(rv
))) {
346 CloseAndRelease(aCx
, NS_ERROR_DOM_ABORT_ERR
);
350 void FetchStreamReader::CloseSteps(JSContext
* aCx
, ErrorResult
& aRv
) {
351 mHasOutstandingReadRequest
= false;
352 CloseAndRelease(aCx
, NS_BASE_STREAM_CLOSED
);
355 void FetchStreamReader::ErrorSteps(JSContext
* aCx
, JS::Handle
<JS::Value
> aError
,
357 mHasOutstandingReadRequest
= false;
358 ReportErrorToConsole(aCx
, aError
);
359 CloseAndRelease(aCx
, NS_ERROR_FAILURE
);
362 nsresult
FetchStreamReader::WriteBuffer() {
363 MOZ_ASSERT(mBuffer
.Length() == (mBufferOffset
+ mBufferRemaining
));
365 char* data
= reinterpret_cast<char*>(mBuffer
.Elements());
367 while (mBufferRemaining
> 0) {
368 uint32_t written
= 0;
370 mPipeOut
->Write(data
+ mBufferOffset
, mBufferRemaining
, &written
);
372 if (rv
== NS_BASE_STREAM_WOULD_BLOCK
) {
376 if (NS_WARN_IF(NS_FAILED(rv
))) {
380 MOZ_ASSERT(written
<= mBufferRemaining
);
381 mBufferRemaining
-= written
;
382 mBufferOffset
+= written
;
384 if (mBufferRemaining
== 0) {
390 nsresult rv
= mPipeOut
->AsyncWait(this, 0, 0, mOwningEventTarget
);
391 if (NS_WARN_IF(NS_FAILED(rv
))) {
398 void FetchStreamReader::ReportErrorToConsole(JSContext
* aCx
,
399 JS::Handle
<JS::Value
> aValue
) {
400 nsCString sourceSpec
;
403 nsString valueString
;
405 nsContentUtils::ExtractErrorValues(aCx
, aValue
, sourceSpec
, &line
, &column
,
408 nsTArray
<nsString
> params
;
409 params
.AppendElement(valueString
);
411 RefPtr
<ConsoleReportCollector
> reporter
= new ConsoleReportCollector();
412 reporter
->AddConsoleReport(nsIScriptError::errorFlag
,
413 "ReadableStreamReader.read"_ns
,
414 nsContentUtils::eDOM_PROPERTIES
, sourceSpec
, line
,
415 column
, "ReadableStreamReadingFailed"_ns
, params
);
417 uint64_t innerWindowId
= 0;
419 if (NS_IsMainThread()) {
420 nsCOMPtr
<nsPIDOMWindowInner
> window
= do_QueryInterface(mGlobal
);
422 innerWindowId
= window
->WindowID();
424 reporter
->FlushReportsToConsole(innerWindowId
);
428 WorkerPrivate
* workerPrivate
= GetWorkerPrivateFromContext(aCx
);
430 innerWindowId
= workerPrivate
->WindowID();
433 RefPtr
<Runnable
> r
= NS_NewRunnableFunction(
434 "FetchStreamReader::ReportErrorToConsole", [reporter
, innerWindowId
]() {
435 reporter
->FlushReportsToConsole(innerWindowId
);
438 workerPrivate
->DispatchToMainThread(r
.forget());
441 } // namespace mozilla::dom