Bug 1848468 - Mark k-rate-dynamics-compressor-connections.html subtest as failing...
[gecko.git] / dom / fetch / FetchStreamReader.cpp
blobde5a2cfefe3fba1436db567c438d7050e45b22a5
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "FetchStreamReader.h"
8 #include "InternalResponse.h"
9 #include "mozilla/ConsoleReportCollector.h"
10 #include "mozilla/ErrorResult.h"
11 #include "mozilla/StaticAnalysisFunctions.h"
12 #include "mozilla/dom/AutoEntryScript.h"
13 #include "mozilla/dom/Promise.h"
14 #include "mozilla/dom/PromiseBinding.h"
15 #include "mozilla/dom/ReadableStream.h"
16 #include "mozilla/dom/ReadableStreamDefaultController.h"
17 #include "mozilla/dom/ReadableStreamDefaultReader.h"
18 #include "mozilla/dom/WorkerPrivate.h"
19 #include "mozilla/dom/WorkerRef.h"
20 #include "mozilla/HoldDropJSObjects.h"
21 #include "mozilla/TaskCategory.h"
22 #include "nsContentUtils.h"
23 #include "nsDebug.h"
24 #include "nsIAsyncInputStream.h"
25 #include "nsIPipe.h"
26 #include "nsIScriptError.h"
27 #include "nsPIDOMWindow.h"
28 #include "jsapi.h"
30 namespace mozilla::dom {
32 NS_IMPL_CYCLE_COLLECTING_ADDREF(FetchStreamReader)
33 NS_IMPL_CYCLE_COLLECTING_RELEASE(FetchStreamReader)
35 NS_IMPL_CYCLE_COLLECTION_CLASS(FetchStreamReader)
37 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(FetchStreamReader)
38 NS_IMPL_CYCLE_COLLECTION_UNLINK(mGlobal)
39 NS_IMPL_CYCLE_COLLECTION_UNLINK(mReader)
40 NS_IMPL_CYCLE_COLLECTION_UNLINK_END
42 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(FetchStreamReader)
43 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mGlobal)
44 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReader)
45 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
47 NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(FetchStreamReader)
48 NS_IMPL_CYCLE_COLLECTION_TRACE_END
50 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(FetchStreamReader)
51 NS_INTERFACE_MAP_ENTRY(nsIOutputStreamCallback)
52 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIOutputStreamCallback)
53 NS_INTERFACE_MAP_END
55 /* static */
56 nsresult FetchStreamReader::Create(JSContext* aCx, nsIGlobalObject* aGlobal,
57 FetchStreamReader** aStreamReader,
58 nsIInputStream** aInputStream) {
59 MOZ_ASSERT(aCx);
60 MOZ_ASSERT(aGlobal);
61 MOZ_ASSERT(aStreamReader);
62 MOZ_ASSERT(aInputStream);
64 RefPtr<FetchStreamReader> streamReader = new FetchStreamReader(aGlobal);
66 nsCOMPtr<nsIAsyncInputStream> pipeIn;
68 NS_NewPipe2(getter_AddRefs(pipeIn), getter_AddRefs(streamReader->mPipeOut),
69 true, true, 0, 0);
71 if (!NS_IsMainThread()) {
72 WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx);
73 MOZ_ASSERT(workerPrivate);
75 RefPtr<StrongWorkerRef> workerRef = StrongWorkerRef::Create(
76 workerPrivate, "FetchStreamReader", [streamReader]() {
77 MOZ_ASSERT(streamReader);
79 // mAsyncWaitWorkerRef may keep the (same) StrongWorkerRef alive even
80 // when mWorkerRef has already been nulled out by a previous call to
81 // CloseAndRelease, we can just safely ignore this callback then
82 // (as would the CloseAndRelease do on a second call).
83 if (streamReader->mWorkerRef) {
84 streamReader->CloseAndRelease(
85 streamReader->mWorkerRef->Private()->GetJSContext(),
86 NS_ERROR_DOM_INVALID_STATE_ERR);
87 } else {
88 MOZ_DIAGNOSTIC_ASSERT(streamReader->mAsyncWaitWorkerRef);
90 });
92 if (NS_WARN_IF(!workerRef)) {
93 streamReader->mPipeOut->CloseWithStatus(NS_ERROR_DOM_INVALID_STATE_ERR);
94 return NS_ERROR_DOM_INVALID_STATE_ERR;
97 // These 2 objects create a ref-cycle here that is broken when the stream is
98 // closed or the worker shutsdown.
99 streamReader->mWorkerRef = std::move(workerRef);
102 pipeIn.forget(aInputStream);
103 streamReader.forget(aStreamReader);
104 return NS_OK;
107 FetchStreamReader::FetchStreamReader(nsIGlobalObject* aGlobal)
108 : mGlobal(aGlobal),
109 mOwningEventTarget(mGlobal->EventTargetFor(TaskCategory::Other)) {
110 MOZ_ASSERT(aGlobal);
112 mozilla::HoldJSObjects(this);
115 FetchStreamReader::~FetchStreamReader() {
116 CloseAndRelease(nullptr, NS_BASE_STREAM_CLOSED);
118 mozilla::DropJSObjects(this);
121 // If a context is provided, an attempt will be made to cancel the reader. The
122 // only situation where we don't expect to have a context is when closure is
123 // being triggered from the destructor or the WorkerRef is notifying. If
124 // we're at the destructor, it's far too late to cancel anything. And if the
125 // WorkerRef is being notified, the global is going away, so there's also
126 // no need to do further JS work.
127 void FetchStreamReader::CloseAndRelease(JSContext* aCx, nsresult aStatus) {
128 NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
130 if (mStreamClosed) {
131 // Already closed.
132 return;
135 RefPtr<FetchStreamReader> kungFuDeathGrip = this;
136 if (aCx && mReader) {
137 ErrorResult rv;
138 if (aStatus == NS_ERROR_DOM_WRONG_TYPE_ERR) {
139 rv.ThrowTypeError<MSG_FETCH_BODY_WRONG_TYPE>();
140 } else {
141 rv = aStatus;
143 JS::Rooted<JS::Value> errorValue(aCx);
144 if (ToJSValue(aCx, std::move(rv), &errorValue)) {
145 IgnoredErrorResult ignoredError;
146 // It's currently safe to cancel an already closed reader because, per the
147 // comments in ReadableStream::cancel() conveying the spec, step 2 of
148 // 3.4.3 that specified ReadableStreamCancel is: If stream.[[state]] is
149 // "closed", return a new promise resolved with undefined.
150 RefPtr<Promise> cancelResultPromise =
151 MOZ_KnownLive(mReader)->Cancel(aCx, errorValue, ignoredError);
152 NS_WARNING_ASSERTION(!ignoredError.Failed(),
153 "Failed to cancel stream during close and release");
154 if (cancelResultPromise) {
155 bool setHandled = cancelResultPromise->SetAnyPromiseIsHandled();
156 NS_WARNING_ASSERTION(setHandled,
157 "Failed to mark cancel promise as handled.");
158 (void)setHandled;
162 // We don't want to propagate exceptions during the cleanup.
163 JS_ClearPendingException(aCx);
166 mStreamClosed = true;
168 mGlobal = nullptr;
170 if (mPipeOut) {
171 mPipeOut->CloseWithStatus(aStatus);
173 mPipeOut = nullptr;
175 mWorkerRef = nullptr;
177 mReader = nullptr;
178 mBuffer.Clear();
181 // https://fetch.spec.whatwg.org/#body-incrementally-read
182 void FetchStreamReader::StartConsuming(JSContext* aCx, ReadableStream* aStream,
183 ErrorResult& aRv) {
184 MOZ_DIAGNOSTIC_ASSERT(!mReader);
185 MOZ_DIAGNOSTIC_ASSERT(aStream);
187 // Step 2: Let reader be the result of getting a reader for body’s stream.
188 RefPtr<ReadableStreamDefaultReader> reader = aStream->GetReader(aRv);
189 if (aRv.Failed()) {
190 CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
191 return;
194 mReader = reader;
196 mAsyncWaitWorkerRef = mWorkerRef;
197 aRv = mPipeOut->AsyncWait(this, 0, 0, mOwningEventTarget);
198 if (NS_WARN_IF(aRv.Failed())) {
199 mAsyncWaitWorkerRef = nullptr;
200 CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
204 struct FetchReadRequest : public ReadRequest {
205 public:
206 NS_DECL_ISUPPORTS_INHERITED
207 NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(FetchReadRequest, ReadRequest)
209 explicit FetchReadRequest(FetchStreamReader* aReader)
210 : mFetchStreamReader(aReader) {}
212 MOZ_CAN_RUN_SCRIPT_BOUNDARY
213 void ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
214 ErrorResult& aRv) override {
215 mFetchStreamReader->ChunkSteps(aCx, aChunk, aRv);
218 MOZ_CAN_RUN_SCRIPT_BOUNDARY
219 void CloseSteps(JSContext* aCx, ErrorResult& aRv) override {
220 mFetchStreamReader->CloseSteps(aCx, aRv);
223 MOZ_CAN_RUN_SCRIPT_BOUNDARY
224 void ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError,
225 ErrorResult& aRv) override {
226 mFetchStreamReader->ErrorSteps(aCx, aError, aRv);
229 protected:
230 virtual ~FetchReadRequest() = default;
232 MOZ_KNOWN_LIVE RefPtr<FetchStreamReader> mFetchStreamReader;
235 NS_IMPL_CYCLE_COLLECTION_INHERITED(FetchReadRequest, ReadRequest,
236 mFetchStreamReader)
237 NS_IMPL_ADDREF_INHERITED(FetchReadRequest, ReadRequest)
238 NS_IMPL_RELEASE_INHERITED(FetchReadRequest, ReadRequest)
239 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(FetchReadRequest)
240 NS_INTERFACE_MAP_END_INHERITING(ReadRequest)
242 // nsIOutputStreamCallback interface
243 MOZ_CAN_RUN_SCRIPT_BOUNDARY
244 NS_IMETHODIMP
245 FetchStreamReader::OnOutputStreamReady(nsIAsyncOutputStream* aStream) {
246 NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
247 if (mStreamClosed) {
248 mAsyncWaitWorkerRef = nullptr;
249 return NS_OK;
252 AutoEntryScript aes(mGlobal, "ReadableStreamReader.read", !mWorkerRef);
253 if (!Process(aes.cx())) {
254 // We're done processing data, and haven't queued up a new AsyncWait - we
255 // can clear our mAsyncWaitWorkerRef.
256 mAsyncWaitWorkerRef = nullptr;
258 return NS_OK;
261 bool FetchStreamReader::Process(JSContext* aCx) {
262 NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
263 MOZ_ASSERT(mReader);
265 if (!mBuffer.IsEmpty()) {
266 nsresult rv = WriteBuffer();
267 if (NS_WARN_IF(NS_FAILED(rv))) {
268 CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR);
269 return false;
271 return true;
274 // Check if the output stream has already been closed. This lets us propagate
275 // errors eagerly, and detect output stream closures even when we have no data
276 // to write.
277 if (NS_WARN_IF(NS_FAILED(mPipeOut->StreamStatus()))) {
278 CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR);
279 return false;
282 // We're waiting on new data - set up a WAIT_CLOSURE_ONLY callback so we
283 // notice if the reader closes.
284 nsresult rv = mPipeOut->AsyncWait(
285 this, nsIAsyncOutputStream::WAIT_CLOSURE_ONLY, 0, mOwningEventTarget);
286 if (NS_WARN_IF(NS_FAILED(rv))) {
287 CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
288 return false;
291 // If we already have an outstanding read request, don't start another one
292 // concurrently.
293 if (!mHasOutstandingReadRequest) {
294 // https://fetch.spec.whatwg.org/#incrementally-read-loop
295 // The below very loosely tries to implement the incrementally-read-loop
296 // from the fetch spec.
297 // Step 2: Read a chunk from reader given readRequest.
298 RefPtr<ReadRequest> readRequest = new FetchReadRequest(this);
299 RefPtr<ReadableStreamDefaultReader> reader = mReader;
300 mHasOutstandingReadRequest = true;
302 IgnoredErrorResult err;
303 reader->ReadChunk(aCx, *readRequest, err);
304 if (NS_WARN_IF(err.Failed())) {
305 // Let's close the stream.
306 mHasOutstandingReadRequest = false;
307 CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
308 // Don't return false, as we've already called `AsyncWait`.
311 return true;
314 void FetchStreamReader::ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
315 ErrorResult& aRv) {
316 // This roughly implements the chunk steps from
317 // https://fetch.spec.whatwg.org/#incrementally-read-loop.
319 mHasOutstandingReadRequest = false;
321 // Step 2. If chunk is not a Uint8Array object, then set continueAlgorithm to
322 // this step: run processBodyError given a TypeError.
323 RootedSpiderMonkeyInterface<Uint8Array> chunk(aCx);
324 if (!aChunk.isObject() || !chunk.Init(&aChunk.toObject())) {
325 CloseAndRelease(aCx, NS_ERROR_DOM_WRONG_TYPE_ERR);
326 return;
328 chunk.ComputeState();
330 MOZ_DIAGNOSTIC_ASSERT(mBuffer.IsEmpty());
332 // Let's take a copy of the data.
333 // FIXME: We could sometimes avoid this copy by trying to write `chunk`
334 // directly into `mPipeOut` eagerly, and only filling `mBuffer` if there isn't
335 // enough space in the pipe's buffer.
336 if (!mBuffer.AppendElements(chunk.Data(), chunk.Length(), fallible)) {
337 CloseAndRelease(aCx, NS_ERROR_OUT_OF_MEMORY);
338 return;
341 mBufferOffset = 0;
342 mBufferRemaining = chunk.Length();
344 nsresult rv = WriteBuffer();
345 if (NS_WARN_IF(NS_FAILED(rv))) {
346 CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR);
350 void FetchStreamReader::CloseSteps(JSContext* aCx, ErrorResult& aRv) {
351 mHasOutstandingReadRequest = false;
352 CloseAndRelease(aCx, NS_BASE_STREAM_CLOSED);
355 void FetchStreamReader::ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError,
356 ErrorResult& aRv) {
357 mHasOutstandingReadRequest = false;
358 ReportErrorToConsole(aCx, aError);
359 CloseAndRelease(aCx, NS_ERROR_FAILURE);
362 nsresult FetchStreamReader::WriteBuffer() {
363 MOZ_ASSERT(mBuffer.Length() == (mBufferOffset + mBufferRemaining));
365 char* data = reinterpret_cast<char*>(mBuffer.Elements());
367 while (mBufferRemaining > 0) {
368 uint32_t written = 0;
369 nsresult rv =
370 mPipeOut->Write(data + mBufferOffset, mBufferRemaining, &written);
372 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
373 break;
376 if (NS_WARN_IF(NS_FAILED(rv))) {
377 return rv;
380 MOZ_ASSERT(written <= mBufferRemaining);
381 mBufferRemaining -= written;
382 mBufferOffset += written;
384 if (mBufferRemaining == 0) {
385 mBuffer.Clear();
386 break;
390 nsresult rv = mPipeOut->AsyncWait(this, 0, 0, mOwningEventTarget);
391 if (NS_WARN_IF(NS_FAILED(rv))) {
392 return rv;
395 return NS_OK;
398 void FetchStreamReader::ReportErrorToConsole(JSContext* aCx,
399 JS::Handle<JS::Value> aValue) {
400 nsCString sourceSpec;
401 uint32_t line = 0;
402 uint32_t column = 0;
403 nsString valueString;
405 nsContentUtils::ExtractErrorValues(aCx, aValue, sourceSpec, &line, &column,
406 valueString);
408 nsTArray<nsString> params;
409 params.AppendElement(valueString);
411 RefPtr<ConsoleReportCollector> reporter = new ConsoleReportCollector();
412 reporter->AddConsoleReport(nsIScriptError::errorFlag,
413 "ReadableStreamReader.read"_ns,
414 nsContentUtils::eDOM_PROPERTIES, sourceSpec, line,
415 column, "ReadableStreamReadingFailed"_ns, params);
417 uint64_t innerWindowId = 0;
419 if (NS_IsMainThread()) {
420 nsCOMPtr<nsPIDOMWindowInner> window = do_QueryInterface(mGlobal);
421 if (window) {
422 innerWindowId = window->WindowID();
424 reporter->FlushReportsToConsole(innerWindowId);
425 return;
428 WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx);
429 if (workerPrivate) {
430 innerWindowId = workerPrivate->WindowID();
433 RefPtr<Runnable> r = NS_NewRunnableFunction(
434 "FetchStreamReader::ReportErrorToConsole", [reporter, innerWindowId]() {
435 reporter->FlushReportsToConsole(innerWindowId);
438 workerPrivate->DispatchToMainThread(r.forget());
441 } // namespace mozilla::dom