Backed out 3 changesets (bug 1877678, bug 1849175) for causing failures on browser_op...
[gecko.git] / xpcom / io / nsMultiplexInputStream.cpp
blobbc8a67ed234eb85947dca720017dadb5bc7c0349
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 /**
8 * The multiplex stream concatenates a list of input streams into a single
9 * stream.
12 #include "mozilla/Attributes.h"
13 #include "mozilla/CheckedInt.h"
14 #include "mozilla/MathAlgorithms.h"
15 #include "mozilla/Mutex.h"
17 #include "base/basictypes.h"
19 #include "nsMultiplexInputStream.h"
20 #include "nsIBufferedStreams.h"
21 #include "nsICloneableInputStream.h"
22 #include "nsIMultiplexInputStream.h"
23 #include "nsISeekableStream.h"
24 #include "nsCOMPtr.h"
25 #include "nsCOMArray.h"
26 #include "nsIClassInfoImpl.h"
27 #include "nsIIPCSerializableInputStream.h"
28 #include "mozilla/ipc/InputStreamUtils.h"
29 #include "nsIAsyncInputStream.h"
30 #include "nsIInputStreamLength.h"
31 #include "nsNetUtil.h"
32 #include "nsStreamUtils.h"
34 using namespace mozilla;
35 using namespace mozilla::ipc;
37 using mozilla::DeprecatedAbs;
39 class nsMultiplexInputStream final : public nsIMultiplexInputStream,
40 public nsISeekableStream,
41 public nsIIPCSerializableInputStream,
42 public nsICloneableInputStream,
43 public nsIAsyncInputStream,
44 public nsIInputStreamCallback,
45 public nsIInputStreamLength,
46 public nsIAsyncInputStreamLength {
47 public:
48 nsMultiplexInputStream();
50 NS_DECL_THREADSAFE_ISUPPORTS
51 NS_DECL_NSIINPUTSTREAM
52 NS_DECL_NSIMULTIPLEXINPUTSTREAM
53 NS_DECL_NSISEEKABLESTREAM
54 NS_DECL_NSITELLABLESTREAM
55 NS_DECL_NSIIPCSERIALIZABLEINPUTSTREAM
56 NS_DECL_NSICLONEABLEINPUTSTREAM
57 NS_DECL_NSIASYNCINPUTSTREAM
58 NS_DECL_NSIINPUTSTREAMCALLBACK
59 NS_DECL_NSIINPUTSTREAMLENGTH
60 NS_DECL_NSIASYNCINPUTSTREAMLENGTH
62 // This is used for nsIAsyncInputStream::AsyncWait
63 void AsyncWaitCompleted();
65 // This is used for nsIAsyncInputStreamLength::AsyncLengthWait
66 void AsyncWaitCompleted(int64_t aLength, const MutexAutoLock& aProofOfLock)
67 MOZ_REQUIRES(mLock);
69 struct StreamData {
70 nsresult Initialize(nsIInputStream* aOriginalStream) {
71 mCurrentPos = 0;
73 mOriginalStream = aOriginalStream;
75 mBufferedStream = aOriginalStream;
76 if (!NS_InputStreamIsBuffered(mBufferedStream)) {
77 nsCOMPtr<nsIInputStream> bufferedStream;
78 nsresult rv = NS_NewBufferedInputStream(getter_AddRefs(bufferedStream),
79 mBufferedStream.forget(), 4096);
80 NS_ENSURE_SUCCESS(rv, rv);
81 mBufferedStream = bufferedStream;
84 mAsyncStream = do_QueryInterface(mBufferedStream);
85 mSeekableStream = do_QueryInterface(mBufferedStream);
87 return NS_OK;
90 nsCOMPtr<nsIInputStream> mOriginalStream;
92 // Equal to mOriginalStream or a wrap around the original stream to make it
93 // buffered.
94 nsCOMPtr<nsIInputStream> mBufferedStream;
96 // This can be null.
97 nsCOMPtr<nsIAsyncInputStream> mAsyncStream;
98 // This can be null.
99 nsCOMPtr<nsISeekableStream> mSeekableStream;
101 uint64_t mCurrentPos;
104 Mutex& GetLock() MOZ_RETURN_CAPABILITY(mLock) { return mLock; }
106 private:
107 ~nsMultiplexInputStream() = default;
109 void NextStream() MOZ_REQUIRES(mLock) {
110 ++mCurrentStream;
111 mStartedReadingCurrent = false;
114 nsresult AsyncWaitInternal();
116 // This method updates mSeekableStreams, mTellableStreams,
117 // mIPCSerializableStreams and mCloneableStreams values.
118 void UpdateQIMap(StreamData& aStream) MOZ_REQUIRES(mLock);
120 struct MOZ_STACK_CLASS ReadSegmentsState {
121 nsCOMPtr<nsIInputStream> mThisStream;
122 uint32_t mOffset;
123 nsWriteSegmentFun mWriter;
124 void* mClosure;
125 bool mDone;
128 void SerializedComplexityInternal(uint32_t aMaxSize, uint32_t* aSizeUsed,
129 uint32_t* aPipes, uint32_t* aTransferables,
130 bool* aSerializeAsPipe);
132 static nsresult ReadSegCb(nsIInputStream* aIn, void* aClosure,
133 const char* aFromRawSegment, uint32_t aToOffset,
134 uint32_t aCount, uint32_t* aWriteCount);
136 bool IsSeekable() const;
137 bool IsIPCSerializable() const;
138 bool IsCloneable() const;
139 bool IsAsyncInputStream() const;
140 bool IsInputStreamLength() const;
141 bool IsAsyncInputStreamLength() const;
143 Mutex mLock; // Protects access to all data members.
145 nsTArray<StreamData> mStreams MOZ_GUARDED_BY(mLock);
147 uint32_t mCurrentStream MOZ_GUARDED_BY(mLock);
148 bool mStartedReadingCurrent MOZ_GUARDED_BY(mLock);
149 nsresult mStatus MOZ_GUARDED_BY(mLock);
150 nsCOMPtr<nsIInputStreamCallback> mAsyncWaitCallback MOZ_GUARDED_BY(mLock);
151 uint32_t mAsyncWaitFlags MOZ_GUARDED_BY(mLock);
152 uint32_t mAsyncWaitRequestedCount MOZ_GUARDED_BY(mLock);
153 nsCOMPtr<nsIEventTarget> mAsyncWaitEventTarget MOZ_GUARDED_BY(mLock);
154 nsCOMPtr<nsIInputStreamLengthCallback> mAsyncWaitLengthCallback
155 MOZ_GUARDED_BY(mLock);
157 class AsyncWaitLengthHelper;
158 RefPtr<AsyncWaitLengthHelper> mAsyncWaitLengthHelper MOZ_GUARDED_BY(mLock);
160 uint32_t mSeekableStreams MOZ_GUARDED_BY(mLock);
161 uint32_t mIPCSerializableStreams MOZ_GUARDED_BY(mLock);
162 uint32_t mCloneableStreams MOZ_GUARDED_BY(mLock);
164 // These are Atomics so that we can check them in QueryInterface without
165 // taking a lock (to look at mStreams.Length() and the numbers above)
166 // With no streams added yet, all of these are possible
167 Atomic<bool, Relaxed> mIsSeekableStream{true};
168 Atomic<bool, Relaxed> mIsIPCSerializableStream{true};
169 Atomic<bool, Relaxed> mIsCloneableStream{true};
171 Atomic<bool, Relaxed> mIsAsyncInputStream{false};
172 Atomic<bool, Relaxed> mIsInputStreamLength{false};
173 Atomic<bool, Relaxed> mIsAsyncInputStreamLength{false};
176 NS_IMPL_ADDREF(nsMultiplexInputStream)
177 NS_IMPL_RELEASE(nsMultiplexInputStream)
179 NS_IMPL_CLASSINFO(nsMultiplexInputStream, nullptr, nsIClassInfo::THREADSAFE,
180 NS_MULTIPLEXINPUTSTREAM_CID)
182 NS_INTERFACE_MAP_BEGIN(nsMultiplexInputStream)
183 NS_INTERFACE_MAP_ENTRY(nsIMultiplexInputStream)
184 NS_INTERFACE_MAP_ENTRY(nsIInputStream)
185 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream, IsSeekable())
186 NS_INTERFACE_MAP_ENTRY(nsITellableStream)
187 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream,
188 IsIPCSerializable())
189 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream, IsCloneable())
190 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream, IsAsyncInputStream())
191 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback,
192 IsAsyncInputStream())
193 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamLength,
194 IsInputStreamLength())
195 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStreamLength,
196 IsAsyncInputStreamLength())
197 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIMultiplexInputStream)
198 NS_IMPL_QUERY_CLASSINFO(nsMultiplexInputStream)
199 NS_INTERFACE_MAP_END
201 NS_IMPL_CI_INTERFACE_GETTER(nsMultiplexInputStream, nsIMultiplexInputStream,
202 nsIInputStream, nsISeekableStream,
203 nsITellableStream)
205 static nsresult AvailableMaybeSeek(nsMultiplexInputStream::StreamData& aStream,
206 uint64_t* aResult) {
207 nsresult rv = aStream.mBufferedStream->Available(aResult);
208 if (rv == NS_BASE_STREAM_CLOSED) {
209 // Blindly seek to the current position if Available() returns
210 // NS_BASE_STREAM_CLOSED.
211 // If nsIFileInputStream is closed in Read() due to CLOSE_ON_EOF flag,
212 // Seek() could reopen the file if REOPEN_ON_REWIND flag is set.
213 if (aStream.mSeekableStream) {
214 nsresult rvSeek =
215 aStream.mSeekableStream->Seek(nsISeekableStream::NS_SEEK_CUR, 0);
216 if (NS_SUCCEEDED(rvSeek)) {
217 rv = aStream.mBufferedStream->Available(aResult);
221 return rv;
224 nsMultiplexInputStream::nsMultiplexInputStream()
225 : mLock("nsMultiplexInputStream lock"),
226 mCurrentStream(0),
227 mStartedReadingCurrent(false),
228 mStatus(NS_OK),
229 mAsyncWaitFlags(0),
230 mAsyncWaitRequestedCount(0),
231 mSeekableStreams(0),
232 mIPCSerializableStreams(0),
233 mCloneableStreams(0) {}
235 NS_IMETHODIMP
236 nsMultiplexInputStream::GetCount(uint32_t* aCount) {
237 MutexAutoLock lock(mLock);
238 *aCount = mStreams.Length();
239 return NS_OK;
242 NS_IMETHODIMP
243 nsMultiplexInputStream::AppendStream(nsIInputStream* aStream) {
244 MutexAutoLock lock(mLock);
246 StreamData* streamData = mStreams.AppendElement(fallible);
247 if (NS_WARN_IF(!streamData)) {
248 return NS_ERROR_OUT_OF_MEMORY;
251 nsresult rv = streamData->Initialize(aStream);
252 NS_ENSURE_SUCCESS(rv, rv);
254 UpdateQIMap(*streamData);
256 if (mStatus == NS_BASE_STREAM_CLOSED) {
257 // We were closed, but now we have more data to read.
258 mStatus = NS_OK;
261 return NS_OK;
264 NS_IMETHODIMP
265 nsMultiplexInputStream::GetStream(uint32_t aIndex, nsIInputStream** aResult) {
266 MutexAutoLock lock(mLock);
268 if (aIndex >= mStreams.Length()) {
269 return NS_ERROR_NOT_AVAILABLE;
272 StreamData& streamData = mStreams.ElementAt(aIndex);
273 nsCOMPtr<nsIInputStream> stream = streamData.mOriginalStream;
274 stream.forget(aResult);
275 return NS_OK;
278 NS_IMETHODIMP
279 nsMultiplexInputStream::Close() {
280 nsTArray<nsCOMPtr<nsIInputStream>> streams;
282 // Let's take a copy of the streams becuase, calling close() it could trigger
283 // a nsIInputStreamCallback immediately and we don't want to create a deadlock
284 // with mutex.
286 MutexAutoLock lock(mLock);
287 uint32_t len = mStreams.Length();
288 for (uint32_t i = 0; i < len; ++i) {
289 if (NS_WARN_IF(
290 !streams.AppendElement(mStreams[i].mBufferedStream, fallible))) {
291 mStatus = NS_BASE_STREAM_CLOSED;
292 return NS_ERROR_OUT_OF_MEMORY;
295 mStatus = NS_BASE_STREAM_CLOSED;
298 nsresult rv = NS_OK;
300 uint32_t len = streams.Length();
301 for (uint32_t i = 0; i < len; ++i) {
302 nsresult rv2 = streams[i]->Close();
303 // We still want to close all streams, but we should return an error
304 if (NS_FAILED(rv2)) {
305 rv = rv2;
309 return rv;
312 NS_IMETHODIMP
313 nsMultiplexInputStream::Available(uint64_t* aResult) {
314 *aResult = 0;
316 MutexAutoLock lock(mLock);
317 if (NS_FAILED(mStatus)) {
318 return mStatus;
321 uint64_t avail = 0;
322 nsresult rv = NS_BASE_STREAM_CLOSED;
324 uint32_t len = mStreams.Length();
325 for (uint32_t i = mCurrentStream; i < len; i++) {
326 uint64_t streamAvail;
327 rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
328 if (rv == NS_BASE_STREAM_CLOSED) {
329 // If a stream is closed, we continue with the next one.
330 // If this is the current stream we move to the following stream.
331 if (mCurrentStream == i) {
332 NextStream();
335 // If this is the last stream, we want to return this error code.
336 continue;
339 if (NS_WARN_IF(NS_FAILED(rv))) {
340 mStatus = rv;
341 return mStatus;
344 // If the current stream is async, we have to return what we have so far
345 // without processing the following streams. This is needed because
346 // ::Available should return only what is currently available. In case of an
347 // nsIAsyncInputStream, we have to call AsyncWait() in order to read more.
348 if (mStreams[i].mAsyncStream) {
349 avail += streamAvail;
350 break;
353 if (streamAvail == 0) {
354 // Nothing to read for this stream. Let's move to the next one.
355 continue;
358 avail += streamAvail;
361 // We still have something to read. We don't want to return an error code yet.
362 if (avail) {
363 *aResult = avail;
364 return NS_OK;
367 // Let's propagate the last error message.
368 mStatus = rv;
369 return rv;
372 NS_IMETHODIMP
373 nsMultiplexInputStream::StreamStatus() {
374 MutexAutoLock lock(mLock);
375 return mStatus;
378 NS_IMETHODIMP
379 nsMultiplexInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult) {
380 MutexAutoLock lock(mLock);
381 // It is tempting to implement this method in terms of ReadSegments, but
382 // that would prevent this class from being used with streams that only
383 // implement Read (e.g., file streams).
385 *aResult = 0;
387 if (mStatus == NS_BASE_STREAM_CLOSED) {
388 return NS_OK;
390 if (NS_FAILED(mStatus)) {
391 return mStatus;
394 nsresult rv = NS_OK;
396 uint32_t len = mStreams.Length();
397 while (mCurrentStream < len && aCount) {
398 uint32_t read;
399 rv = mStreams[mCurrentStream].mBufferedStream->Read(aBuf, aCount, &read);
401 // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
402 // (This is a bug in those stream implementations)
403 if (rv == NS_BASE_STREAM_CLOSED) {
404 MOZ_ASSERT_UNREACHABLE(
405 "Input stream's Read method returned "
406 "NS_BASE_STREAM_CLOSED");
407 rv = NS_OK;
408 read = 0;
409 } else if (NS_FAILED(rv)) {
410 break;
413 if (read == 0) {
414 NextStream();
415 } else {
416 NS_ASSERTION(aCount >= read, "Read more than requested");
417 *aResult += read;
418 aCount -= read;
419 aBuf += read;
420 mStartedReadingCurrent = true;
422 mStreams[mCurrentStream].mCurrentPos += read;
425 return *aResult ? NS_OK : rv;
428 NS_IMETHODIMP
429 nsMultiplexInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
430 uint32_t aCount, uint32_t* aResult) {
431 MutexAutoLock lock(mLock);
433 if (mStatus == NS_BASE_STREAM_CLOSED) {
434 *aResult = 0;
435 return NS_OK;
437 if (NS_FAILED(mStatus)) {
438 return mStatus;
441 NS_ASSERTION(aWriter, "missing aWriter");
443 nsresult rv = NS_OK;
444 ReadSegmentsState state;
445 state.mThisStream = this;
446 state.mOffset = 0;
447 state.mWriter = aWriter;
448 state.mClosure = aClosure;
449 state.mDone = false;
451 uint32_t len = mStreams.Length();
452 while (mCurrentStream < len && aCount) {
453 uint32_t read;
454 rv = mStreams[mCurrentStream].mBufferedStream->ReadSegments(
455 ReadSegCb, &state, aCount, &read);
457 // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
458 // (This is a bug in those stream implementations)
459 if (rv == NS_BASE_STREAM_CLOSED) {
460 MOZ_ASSERT_UNREACHABLE(
461 "Input stream's Read method returned "
462 "NS_BASE_STREAM_CLOSED");
463 rv = NS_OK;
464 read = 0;
467 // if |aWriter| decided to stop reading segments...
468 if (state.mDone || NS_FAILED(rv)) {
469 break;
472 // if stream is empty, then advance to the next stream.
473 if (read == 0) {
474 NextStream();
475 } else {
476 NS_ASSERTION(aCount >= read, "Read more than requested");
477 state.mOffset += read;
478 aCount -= read;
479 mStartedReadingCurrent = true;
481 mStreams[mCurrentStream].mCurrentPos += read;
485 // if we successfully read some data, then this call succeeded.
486 *aResult = state.mOffset;
487 return state.mOffset ? NS_OK : rv;
490 nsresult nsMultiplexInputStream::ReadSegCb(nsIInputStream* aIn, void* aClosure,
491 const char* aFromRawSegment,
492 uint32_t aToOffset, uint32_t aCount,
493 uint32_t* aWriteCount) {
494 nsresult rv;
495 ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
496 rv = (state->mWriter)(state->mThisStream, state->mClosure, aFromRawSegment,
497 aToOffset + state->mOffset, aCount, aWriteCount);
498 if (NS_FAILED(rv)) {
499 state->mDone = true;
501 return rv;
504 NS_IMETHODIMP
505 nsMultiplexInputStream::IsNonBlocking(bool* aNonBlocking) {
506 MutexAutoLock lock(mLock);
508 uint32_t len = mStreams.Length();
509 if (len == 0) {
510 // Claim to be non-blocking, since we won't block the caller.
511 *aNonBlocking = true;
512 return NS_OK;
515 for (uint32_t i = 0; i < len; ++i) {
516 nsresult rv = mStreams[i].mBufferedStream->IsNonBlocking(aNonBlocking);
517 if (NS_WARN_IF(NS_FAILED(rv))) {
518 return rv;
520 // If one is blocking the entire stream becomes blocking.
521 if (!*aNonBlocking) {
522 return NS_OK;
526 return NS_OK;
529 NS_IMETHODIMP
530 nsMultiplexInputStream::Seek(int32_t aWhence, int64_t aOffset) {
531 MutexAutoLock lock(mLock);
533 if (NS_FAILED(mStatus)) {
534 return mStatus;
537 nsresult rv;
539 uint32_t oldCurrentStream = mCurrentStream;
540 bool oldStartedReadingCurrent = mStartedReadingCurrent;
542 if (aWhence == NS_SEEK_SET) {
543 int64_t remaining = aOffset;
544 if (aOffset == 0) {
545 mCurrentStream = 0;
547 for (uint32_t i = 0; i < mStreams.Length(); ++i) {
548 nsCOMPtr<nsISeekableStream> stream = mStreams[i].mSeekableStream;
549 if (!stream) {
550 return NS_ERROR_FAILURE;
553 // See if all remaining streams should be rewound
554 if (remaining == 0) {
555 if (i < oldCurrentStream ||
556 (i == oldCurrentStream && oldStartedReadingCurrent)) {
557 rv = stream->Seek(NS_SEEK_SET, 0);
558 if (NS_WARN_IF(NS_FAILED(rv))) {
559 return rv;
562 mStreams[i].mCurrentPos = 0;
563 continue;
564 } else {
565 break;
569 // Get position in the current stream
570 int64_t streamPos;
571 if (i > oldCurrentStream ||
572 (i == oldCurrentStream && !oldStartedReadingCurrent)) {
573 streamPos = 0;
574 } else {
575 streamPos = mStreams[i].mCurrentPos;
578 // See if we need to seek the current stream forward or backward
579 if (remaining < streamPos) {
580 rv = stream->Seek(NS_SEEK_SET, remaining);
581 if (NS_WARN_IF(NS_FAILED(rv))) {
582 return rv;
585 mStreams[i].mCurrentPos = remaining;
586 mCurrentStream = i;
587 mStartedReadingCurrent = remaining != 0;
589 remaining = 0;
590 } else if (remaining > streamPos) {
591 if (i < oldCurrentStream) {
592 // We're already at end so no need to seek this stream
593 remaining -= streamPos;
594 NS_ASSERTION(remaining >= 0, "Remaining invalid");
595 } else {
596 uint64_t avail;
597 rv = AvailableMaybeSeek(mStreams[i], &avail);
598 if (NS_WARN_IF(NS_FAILED(rv))) {
599 return rv;
602 int64_t newPos = XPCOM_MIN(remaining, streamPos + (int64_t)avail);
604 rv = stream->Seek(NS_SEEK_SET, newPos);
605 if (NS_WARN_IF(NS_FAILED(rv))) {
606 return rv;
609 mStreams[i].mCurrentPos = newPos;
610 mCurrentStream = i;
611 mStartedReadingCurrent = true;
613 remaining -= newPos;
614 NS_ASSERTION(remaining >= 0, "Remaining invalid");
616 } else {
617 NS_ASSERTION(remaining == streamPos, "Huh?");
618 MOZ_ASSERT(remaining != 0, "Zero remaining should be handled earlier");
619 remaining = 0;
620 mCurrentStream = i;
621 mStartedReadingCurrent = true;
625 return NS_OK;
628 if (aWhence == NS_SEEK_CUR && aOffset > 0) {
629 int64_t remaining = aOffset;
630 for (uint32_t i = mCurrentStream; remaining && i < mStreams.Length(); ++i) {
631 uint64_t avail;
632 rv = AvailableMaybeSeek(mStreams[i], &avail);
633 if (NS_WARN_IF(NS_FAILED(rv))) {
634 return rv;
637 int64_t seek = XPCOM_MIN((int64_t)avail, remaining);
639 rv = mStreams[i].mSeekableStream->Seek(NS_SEEK_CUR, seek);
640 if (NS_WARN_IF(NS_FAILED(rv))) {
641 return rv;
644 mStreams[i].mCurrentPos += seek;
645 mCurrentStream = i;
646 mStartedReadingCurrent = true;
648 remaining -= seek;
651 return NS_OK;
654 if (aWhence == NS_SEEK_CUR && aOffset < 0) {
655 int64_t remaining = -aOffset;
656 for (uint32_t i = mCurrentStream; remaining && i != (uint32_t)-1; --i) {
657 int64_t pos = mStreams[i].mCurrentPos;
659 int64_t seek = XPCOM_MIN(pos, remaining);
661 rv = mStreams[i].mSeekableStream->Seek(NS_SEEK_CUR, -seek);
662 if (NS_WARN_IF(NS_FAILED(rv))) {
663 return rv;
666 mStreams[i].mCurrentPos -= seek;
667 mCurrentStream = i;
668 mStartedReadingCurrent = seek != -pos;
670 remaining -= seek;
673 return NS_OK;
676 if (aWhence == NS_SEEK_CUR) {
677 NS_ASSERTION(aOffset == 0, "Should have handled all non-zero values");
679 return NS_OK;
682 if (aWhence == NS_SEEK_END) {
683 if (aOffset > 0) {
684 return NS_ERROR_INVALID_ARG;
687 int64_t remaining = aOffset;
688 int32_t i;
689 for (i = mStreams.Length() - 1; i >= 0; --i) {
690 nsCOMPtr<nsISeekableStream> stream = mStreams[i].mSeekableStream;
692 uint64_t avail;
693 rv = AvailableMaybeSeek(mStreams[i], &avail);
694 if (NS_WARN_IF(NS_FAILED(rv))) {
695 return rv;
698 int64_t streamLength = avail + mStreams[i].mCurrentPos;
700 // The seek(END) can be completed in the current stream.
701 if (streamLength >= DeprecatedAbs(remaining)) {
702 rv = stream->Seek(NS_SEEK_END, remaining);
703 if (NS_WARN_IF(NS_FAILED(rv))) {
704 return rv;
707 mStreams[i].mCurrentPos = streamLength + remaining;
708 mCurrentStream = i;
709 mStartedReadingCurrent = true;
710 break;
713 // We are at the beginning of this stream.
714 rv = stream->Seek(NS_SEEK_SET, 0);
715 if (NS_WARN_IF(NS_FAILED(rv))) {
716 return rv;
719 remaining += streamLength;
720 mStreams[i].mCurrentPos = 0;
723 // Any other stream must be set to the end.
724 for (--i; i >= 0; --i) {
725 nsCOMPtr<nsISeekableStream> stream = mStreams[i].mSeekableStream;
727 uint64_t avail;
728 rv = AvailableMaybeSeek(mStreams[i], &avail);
729 if (NS_WARN_IF(NS_FAILED(rv))) {
730 return rv;
733 int64_t streamLength = avail + mStreams[i].mCurrentPos;
735 rv = stream->Seek(NS_SEEK_END, 0);
736 if (NS_WARN_IF(NS_FAILED(rv))) {
737 return rv;
740 mStreams[i].mCurrentPos = streamLength;
743 return NS_OK;
746 // other Seeks not implemented yet
747 return NS_ERROR_NOT_IMPLEMENTED;
750 NS_IMETHODIMP
751 nsMultiplexInputStream::Tell(int64_t* aResult) {
752 MutexAutoLock lock(mLock);
754 if (NS_FAILED(mStatus)) {
755 return mStatus;
758 int64_t ret64 = 0;
759 #ifdef DEBUG
760 bool zeroFound = false;
761 #endif
763 for (uint32_t i = 0; i < mStreams.Length(); ++i) {
764 ret64 += mStreams[i].mCurrentPos;
766 #ifdef DEBUG
767 // When we see 1 stream with currentPos = 0, all the remaining streams must
768 // be set to 0 as well.
769 MOZ_ASSERT_IF(zeroFound, mStreams[i].mCurrentPos == 0);
770 if (mStreams[i].mCurrentPos == 0) {
771 zeroFound = true;
773 #endif
775 *aResult = ret64;
777 return NS_OK;
780 NS_IMETHODIMP
781 nsMultiplexInputStream::SetEOF() { return NS_ERROR_NOT_IMPLEMENTED; }
783 NS_IMETHODIMP
784 nsMultiplexInputStream::CloseWithStatus(nsresult aStatus) { return Close(); }
786 // This class is used to inform nsMultiplexInputStream that it's time to execute
787 // the asyncWait callback.
788 class AsyncWaitRunnable final : public DiscardableRunnable {
789 RefPtr<nsMultiplexInputStream> mStream;
791 public:
792 static void Create(nsMultiplexInputStream* aStream,
793 nsIEventTarget* aEventTarget) {
794 RefPtr<AsyncWaitRunnable> runnable = new AsyncWaitRunnable(aStream);
795 if (aEventTarget) {
796 aEventTarget->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
797 } else {
798 runnable->Run();
802 NS_IMETHOD
803 Run() override {
804 mStream->AsyncWaitCompleted();
805 return NS_OK;
808 private:
809 explicit AsyncWaitRunnable(nsMultiplexInputStream* aStream)
810 : DiscardableRunnable("AsyncWaitRunnable"), mStream(aStream) {
811 MOZ_ASSERT(aStream);
815 NS_IMETHODIMP
816 nsMultiplexInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
817 uint32_t aFlags, uint32_t aRequestedCount,
818 nsIEventTarget* aEventTarget) {
820 MutexAutoLock lock(mLock);
822 // We must execute the callback also when the stream is closed.
823 if (NS_FAILED(mStatus) && mStatus != NS_BASE_STREAM_CLOSED) {
824 return mStatus;
827 if (NS_WARN_IF(mAsyncWaitCallback && aCallback &&
828 mAsyncWaitCallback != aCallback)) {
829 return NS_ERROR_FAILURE;
832 mAsyncWaitCallback = aCallback;
833 mAsyncWaitFlags = aFlags;
834 mAsyncWaitRequestedCount = aRequestedCount;
835 mAsyncWaitEventTarget = aEventTarget;
838 return AsyncWaitInternal();
841 nsresult nsMultiplexInputStream::AsyncWaitInternal() {
842 nsCOMPtr<nsIAsyncInputStream> stream;
843 nsIInputStreamCallback* asyncWaitCallback = nullptr;
844 uint32_t asyncWaitFlags = 0;
845 uint32_t asyncWaitRequestedCount = 0;
846 nsCOMPtr<nsIEventTarget> asyncWaitEventTarget;
849 MutexAutoLock lock(mLock);
851 // Let's take the first async stream if we are not already closed, and if
852 // it has data to read or if it async.
853 if (mStatus != NS_BASE_STREAM_CLOSED) {
854 for (; mCurrentStream < mStreams.Length(); NextStream()) {
855 stream = mStreams[mCurrentStream].mAsyncStream;
856 if (stream) {
857 break;
860 uint64_t avail = 0;
861 nsresult rv = AvailableMaybeSeek(mStreams[mCurrentStream], &avail);
862 if (rv == NS_BASE_STREAM_CLOSED || (NS_SUCCEEDED(rv) && avail == 0)) {
863 // Nothing to read here. Let's move on.
864 continue;
867 if (NS_FAILED(rv)) {
868 return rv;
871 break;
875 asyncWaitCallback = mAsyncWaitCallback ? this : nullptr;
876 asyncWaitFlags = mAsyncWaitFlags;
877 asyncWaitRequestedCount = mAsyncWaitRequestedCount;
878 asyncWaitEventTarget = mAsyncWaitEventTarget;
880 MOZ_ASSERT_IF(stream, NS_SUCCEEDED(mStatus));
883 // If we are here it's because we are already closed, or if the current stream
884 // is not async. In both case we have to execute the callback.
885 if (!stream) {
886 if (asyncWaitCallback) {
887 AsyncWaitRunnable::Create(this, asyncWaitEventTarget);
889 return NS_OK;
892 return stream->AsyncWait(asyncWaitCallback, asyncWaitFlags,
893 asyncWaitRequestedCount, asyncWaitEventTarget);
896 NS_IMETHODIMP
897 nsMultiplexInputStream::OnInputStreamReady(nsIAsyncInputStream* aStream) {
898 nsCOMPtr<nsIInputStreamCallback> callback;
900 // When OnInputStreamReady is called, we could be in 2 scenarios:
901 // a. there is something to read;
902 // b. the stream is closed.
903 // But if the stream is closed and it was not the last one, we must proceed
904 // with the following stream in order to have something to read by the callee.
907 MutexAutoLock lock(mLock);
909 // The callback has been nullified in the meantime.
910 if (!mAsyncWaitCallback) {
911 return NS_OK;
914 if (NS_SUCCEEDED(mStatus)) {
915 uint64_t avail = 0;
916 nsresult rv = NS_OK;
917 // Only check `Available()` if `aStream` is actually the current stream,
918 // otherwise we'll always want to re-poll, as we got the callback for the
919 // wrong stream.
920 if (mCurrentStream < mStreams.Length() &&
921 aStream == mStreams[mCurrentStream].mAsyncStream) {
922 rv = aStream->Available(&avail);
924 if (rv == NS_BASE_STREAM_CLOSED || (NS_SUCCEEDED(rv) && avail == 0)) {
925 // This stream is either closed, has no data available, or is not the
926 // current stream. If it is closed and current, move to the next stream,
927 // otherwise re-wait on the current stream until it has data available
928 // or becomes closed.
929 // Unlike streams not implementing nsIAsyncInputStream, async streams
930 // cannot use `Available() == 0` to indicate EOF, so we re-poll in that
931 // situation.
932 if (NS_FAILED(rv)) {
933 NextStream();
936 // Unlock and invoke AsyncWaitInternal to wait again. If this succeeds,
937 // we'll be called again, otherwise fall through and notify.
938 MutexAutoUnlock unlock(mLock);
939 if (NS_SUCCEEDED(AsyncWaitInternal())) {
940 return NS_OK;
945 mAsyncWaitCallback.swap(callback);
946 mAsyncWaitEventTarget = nullptr;
949 return callback ? callback->OnInputStreamReady(this) : NS_OK;
952 void nsMultiplexInputStream::AsyncWaitCompleted() {
953 nsCOMPtr<nsIInputStreamCallback> callback;
956 MutexAutoLock lock(mLock);
958 // The callback has been nullified in the meantime.
959 if (!mAsyncWaitCallback) {
960 return;
963 mAsyncWaitCallback.swap(callback);
964 mAsyncWaitEventTarget = nullptr;
967 callback->OnInputStreamReady(this);
970 nsresult nsMultiplexInputStreamConstructor(REFNSIID aIID, void** aResult) {
971 *aResult = nullptr;
973 RefPtr<nsMultiplexInputStream> inst = new nsMultiplexInputStream();
975 return inst->QueryInterface(aIID, aResult);
978 void nsMultiplexInputStream::SerializedComplexity(uint32_t aMaxSize,
979 uint32_t* aSizeUsed,
980 uint32_t* aPipes,
981 uint32_t* aTransferables) {
982 MutexAutoLock lock(mLock);
983 bool serializeAsPipe = false;
984 SerializedComplexityInternal(aMaxSize, aSizeUsed, aPipes, aTransferables,
985 &serializeAsPipe);
988 void nsMultiplexInputStream::SerializedComplexityInternal(
989 uint32_t aMaxSize, uint32_t* aSizeUsed, uint32_t* aPipes,
990 uint32_t* aTransferables, bool* aSerializeAsPipe) {
991 mLock.AssertCurrentThreadOwns();
992 CheckedUint32 totalSizeUsed = 0;
993 CheckedUint32 totalPipes = 0;
994 CheckedUint32 totalTransferables = 0;
995 CheckedUint32 maxSize = aMaxSize;
997 uint32_t streamCount = mStreams.Length();
999 for (uint32_t index = 0; index < streamCount; index++) {
1000 uint32_t sizeUsed = 0;
1001 uint32_t pipes = 0;
1002 uint32_t transferables = 0;
1003 InputStreamHelper::SerializedComplexity(mStreams[index].mOriginalStream,
1004 maxSize.value(), &sizeUsed, &pipes,
1005 &transferables);
1007 MOZ_ASSERT(maxSize.value() >= sizeUsed);
1009 maxSize -= sizeUsed;
1010 MOZ_DIAGNOSTIC_ASSERT(maxSize.isValid());
1011 totalSizeUsed += sizeUsed;
1012 MOZ_DIAGNOSTIC_ASSERT(totalSizeUsed.isValid());
1013 totalPipes += pipes;
1014 MOZ_DIAGNOSTIC_ASSERT(totalPipes.isValid());
1015 totalTransferables += transferables;
1016 MOZ_DIAGNOSTIC_ASSERT(totalTransferables.isValid());
1019 // If the combination of all streams when serialized independently is
1020 // sufficiently complex, we may choose to serialize it as a pipe to limit the
1021 // complexity of the payload.
1022 if (totalTransferables.value() == 0) {
1023 // If there are no transferables within our serialization, and it would
1024 // contain at least one pipe, serialize the entire payload as a pipe for
1025 // simplicity.
1026 *aSerializeAsPipe = totalSizeUsed.value() > 0 && totalPipes.value() > 0;
1027 } else {
1028 // Otherwise, we may want to still serialize in segments to take advantage
1029 // of the efficiency of serializing transferables. We'll only serialize as a
1030 // pipe if the total attachment count exceeds kMaxAttachmentThreshold.
1031 static constexpr uint32_t kMaxAttachmentThreshold = 8;
1032 CheckedUint32 totalAttachments = totalPipes + totalTransferables;
1033 *aSerializeAsPipe = !totalAttachments.isValid() ||
1034 totalAttachments.value() > kMaxAttachmentThreshold;
1037 if (*aSerializeAsPipe) {
1038 NS_WARNING(
1039 nsPrintfCString("Choosing to serialize multiplex stream as a pipe "
1040 "(would be %u bytes, %u pipes, %u transferables)",
1041 totalSizeUsed.value(), totalPipes.value(),
1042 totalTransferables.value())
1043 .get());
1044 *aSizeUsed = 0;
1045 *aPipes = 1;
1046 *aTransferables = 0;
1047 } else {
1048 *aSizeUsed = totalSizeUsed.value();
1049 *aPipes = totalPipes.value();
1050 *aTransferables = totalTransferables.value();
1054 void nsMultiplexInputStream::Serialize(InputStreamParams& aParams,
1055 uint32_t aMaxSize, uint32_t* aSizeUsed) {
1056 MutexAutoLock lock(mLock);
1058 // Check if we should serialize this stream as a pipe to reduce complexity.
1059 uint32_t dummySizeUsed = 0, dummyPipes = 0, dummyTransferables = 0;
1060 bool serializeAsPipe = false;
1061 SerializedComplexityInternal(aMaxSize, &dummySizeUsed, &dummyPipes,
1062 &dummyTransferables, &serializeAsPipe);
1063 if (serializeAsPipe) {
1064 *aSizeUsed = 0;
1065 MutexAutoUnlock unlock(mLock);
1066 InputStreamHelper::SerializeInputStreamAsPipe(this, aParams);
1067 return;
1070 MultiplexInputStreamParams params;
1072 CheckedUint32 totalSizeUsed = 0;
1073 CheckedUint32 maxSize = aMaxSize;
1075 uint32_t streamCount = mStreams.Length();
1076 if (streamCount) {
1077 nsTArray<InputStreamParams>& streams = params.streams();
1079 streams.SetCapacity(streamCount);
1080 for (uint32_t index = 0; index < streamCount; index++) {
1081 uint32_t sizeUsed = 0;
1082 InputStreamHelper::SerializeInputStream(mStreams[index].mOriginalStream,
1083 *streams.AppendElement(),
1084 maxSize.value(), &sizeUsed);
1086 MOZ_ASSERT(maxSize.value() >= sizeUsed);
1088 maxSize -= sizeUsed;
1089 MOZ_DIAGNOSTIC_ASSERT(maxSize.isValid());
1091 totalSizeUsed += sizeUsed;
1092 MOZ_DIAGNOSTIC_ASSERT(totalSizeUsed.isValid());
1096 params.currentStream() = mCurrentStream;
1097 params.status() = mStatus;
1098 params.startedReadingCurrent() = mStartedReadingCurrent;
1100 aParams = std::move(params);
1102 MOZ_ASSERT(aSizeUsed);
1103 *aSizeUsed = totalSizeUsed.value();
1106 bool nsMultiplexInputStream::Deserialize(const InputStreamParams& aParams) {
1107 if (aParams.type() != InputStreamParams::TMultiplexInputStreamParams) {
1108 NS_ERROR("Received unknown parameters from the other process!");
1109 return false;
1112 const MultiplexInputStreamParams& params =
1113 aParams.get_MultiplexInputStreamParams();
1115 const nsTArray<InputStreamParams>& streams = params.streams();
1117 uint32_t streamCount = streams.Length();
1118 for (uint32_t index = 0; index < streamCount; index++) {
1119 nsCOMPtr<nsIInputStream> stream =
1120 InputStreamHelper::DeserializeInputStream(streams[index]);
1121 if (!stream) {
1122 NS_WARNING("Deserialize failed!");
1123 return false;
1126 if (NS_FAILED(AppendStream(stream))) {
1127 NS_WARNING("AppendStream failed!");
1128 return false;
1132 MutexAutoLock lock(mLock);
1133 mCurrentStream = params.currentStream();
1134 mStatus = params.status();
1135 mStartedReadingCurrent = params.startedReadingCurrent();
1137 return true;
1140 NS_IMETHODIMP
1141 nsMultiplexInputStream::GetCloneable(bool* aCloneable) {
1142 MutexAutoLock lock(mLock);
1143 // XXXnsm Cloning a multiplex stream which has started reading is not
1144 // permitted right now.
1145 if (mCurrentStream > 0 || mStartedReadingCurrent) {
1146 *aCloneable = false;
1147 return NS_OK;
1150 uint32_t len = mStreams.Length();
1151 for (uint32_t i = 0; i < len; ++i) {
1152 nsCOMPtr<nsICloneableInputStream> cis =
1153 do_QueryInterface(mStreams[i].mBufferedStream);
1154 if (!cis || !cis->GetCloneable()) {
1155 *aCloneable = false;
1156 return NS_OK;
1160 *aCloneable = true;
1161 return NS_OK;
1164 NS_IMETHODIMP
1165 nsMultiplexInputStream::Clone(nsIInputStream** aClone) {
1166 MutexAutoLock lock(mLock);
1168 // XXXnsm Cloning a multiplex stream which has started reading is not
1169 // permitted right now.
1170 if (mCurrentStream > 0 || mStartedReadingCurrent) {
1171 return NS_ERROR_FAILURE;
1174 RefPtr<nsMultiplexInputStream> clone = new nsMultiplexInputStream();
1176 nsresult rv;
1177 uint32_t len = mStreams.Length();
1178 for (uint32_t i = 0; i < len; ++i) {
1179 nsCOMPtr<nsICloneableInputStream> substream =
1180 do_QueryInterface(mStreams[i].mBufferedStream);
1181 if (NS_WARN_IF(!substream)) {
1182 return NS_ERROR_FAILURE;
1185 nsCOMPtr<nsIInputStream> clonedSubstream;
1186 rv = substream->Clone(getter_AddRefs(clonedSubstream));
1187 if (NS_WARN_IF(NS_FAILED(rv))) {
1188 return rv;
1191 rv = clone->AppendStream(clonedSubstream);
1192 if (NS_WARN_IF(NS_FAILED(rv))) {
1193 return rv;
1197 clone.forget(aClone);
1198 return NS_OK;
1201 NS_IMETHODIMP
1202 nsMultiplexInputStream::Length(int64_t* aLength) {
1203 MutexAutoLock lock(mLock);
1205 if (mCurrentStream > 0 || mStartedReadingCurrent) {
1206 return NS_ERROR_NOT_AVAILABLE;
1209 CheckedInt64 length = 0;
1210 nsresult retval = NS_OK;
1212 for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
1213 nsCOMPtr<nsIInputStreamLength> substream =
1214 do_QueryInterface(mStreams[i].mBufferedStream);
1215 if (!substream) {
1216 // Let's use available as fallback.
1217 uint64_t streamAvail = 0;
1218 nsresult rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
1219 if (rv == NS_BASE_STREAM_CLOSED) {
1220 continue;
1223 if (NS_WARN_IF(NS_FAILED(rv))) {
1224 mStatus = rv;
1225 return mStatus;
1228 length += streamAvail;
1229 if (!length.isValid()) {
1230 return NS_ERROR_OUT_OF_MEMORY;
1233 continue;
1236 int64_t size = 0;
1237 nsresult rv = substream->Length(&size);
1238 if (rv == NS_BASE_STREAM_CLOSED) {
1239 continue;
1242 if (rv == NS_ERROR_NOT_AVAILABLE) {
1243 return rv;
1246 // If one stream blocks, we all block.
1247 if (rv != NS_BASE_STREAM_WOULD_BLOCK && NS_WARN_IF(NS_FAILED(rv))) {
1248 return rv;
1251 // We want to return WOULD_BLOCK if there is 1 stream that blocks. But want
1252 // to see if there are other streams with length = -1.
1253 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
1254 retval = NS_BASE_STREAM_WOULD_BLOCK;
1255 continue;
1258 // If one of the stream doesn't know the size, we all don't know the size.
1259 if (size == -1) {
1260 *aLength = -1;
1261 return NS_OK;
1264 length += size;
1265 if (!length.isValid()) {
1266 return NS_ERROR_OUT_OF_MEMORY;
1270 *aLength = length.value();
1271 return retval;
1274 class nsMultiplexInputStream::AsyncWaitLengthHelper final
1275 : public nsIInputStreamLengthCallback {
1276 public:
1277 NS_DECL_THREADSAFE_ISUPPORTS
1279 AsyncWaitLengthHelper()
1280 : mStreamNotified(false), mLength(0), mNegativeSize(false) {}
1282 bool AddStream(nsIAsyncInputStreamLength* aStream) {
1283 return mPendingStreams.AppendElement(aStream, fallible);
1286 bool AddSize(int64_t aSize) {
1287 MOZ_ASSERT(!mNegativeSize);
1289 mLength += aSize;
1290 return mLength.isValid();
1293 void NegativeSize() {
1294 MOZ_ASSERT(!mNegativeSize);
1295 mNegativeSize = true;
1298 nsresult Proceed(nsMultiplexInputStream* aParentStream,
1299 nsIEventTarget* aEventTarget,
1300 const MutexAutoLock& aProofOfLock) {
1301 MOZ_ASSERT(!mStream);
1303 // If we don't need to wait, let's inform the callback immediately.
1304 if (mPendingStreams.IsEmpty() || mNegativeSize) {
1305 RefPtr<nsMultiplexInputStream> parentStream = aParentStream;
1306 int64_t length = -1;
1307 if (!mNegativeSize && mLength.isValid()) {
1308 length = mLength.value();
1310 nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
1311 "AsyncWaitLengthHelper", [parentStream, length]() {
1312 MutexAutoLock lock(parentStream->GetLock());
1313 parentStream->AsyncWaitCompleted(length, lock);
1315 return aEventTarget->Dispatch(r.forget(), NS_DISPATCH_NORMAL);
1318 // Let's store the callback and the parent stream until we have
1319 // notifications from the async length streams.
1321 mStream = aParentStream;
1323 // Let's activate all the pending streams.
1324 for (nsIAsyncInputStreamLength* stream : mPendingStreams) {
1325 nsresult rv = stream->AsyncLengthWait(this, aEventTarget);
1326 if (rv == NS_BASE_STREAM_CLOSED) {
1327 continue;
1330 if (NS_WARN_IF(NS_FAILED(rv))) {
1331 return rv;
1335 return NS_OK;
1338 NS_IMETHOD
1339 OnInputStreamLengthReady(nsIAsyncInputStreamLength* aStream,
1340 int64_t aLength) override {
1341 MutexAutoLock lock(mStream->GetLock());
1343 MOZ_ASSERT(mPendingStreams.Contains(aStream));
1344 mPendingStreams.RemoveElement(aStream);
1346 // Already notified.
1347 if (mStreamNotified) {
1348 return NS_OK;
1351 if (aLength == -1) {
1352 mNegativeSize = true;
1353 } else {
1354 mLength += aLength;
1355 if (!mLength.isValid()) {
1356 mNegativeSize = true;
1360 // We need to wait.
1361 if (!mNegativeSize && !mPendingStreams.IsEmpty()) {
1362 return NS_OK;
1365 // Let's notify the parent stream.
1366 mStreamNotified = true;
1367 mStream->AsyncWaitCompleted(mNegativeSize ? -1 : mLength.value(), lock);
1368 return NS_OK;
1371 private:
1372 ~AsyncWaitLengthHelper() = default;
1374 RefPtr<nsMultiplexInputStream> mStream;
1375 bool mStreamNotified;
1377 CheckedInt64 mLength;
1378 bool mNegativeSize;
1380 nsTArray<nsCOMPtr<nsIAsyncInputStreamLength>> mPendingStreams;
1383 NS_IMPL_ISUPPORTS(nsMultiplexInputStream::AsyncWaitLengthHelper,
1384 nsIInputStreamLengthCallback)
1386 NS_IMETHODIMP
1387 nsMultiplexInputStream::AsyncLengthWait(nsIInputStreamLengthCallback* aCallback,
1388 nsIEventTarget* aEventTarget) {
1389 if (NS_WARN_IF(!aEventTarget)) {
1390 return NS_ERROR_NULL_POINTER;
1393 MutexAutoLock lock(mLock);
1395 if (mCurrentStream > 0 || mStartedReadingCurrent) {
1396 return NS_ERROR_NOT_AVAILABLE;
1399 if (!aCallback) {
1400 mAsyncWaitLengthCallback = nullptr;
1401 return NS_OK;
1404 // We have a pending operation! Let's use this instead of creating a new one.
1405 if (mAsyncWaitLengthHelper) {
1406 mAsyncWaitLengthCallback = aCallback;
1407 return NS_OK;
1410 RefPtr<AsyncWaitLengthHelper> helper = new AsyncWaitLengthHelper();
1412 for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
1413 nsCOMPtr<nsIAsyncInputStreamLength> asyncStream =
1414 do_QueryInterface(mStreams[i].mBufferedStream);
1415 if (asyncStream) {
1416 if (NS_WARN_IF(!helper->AddStream(asyncStream))) {
1417 return NS_ERROR_OUT_OF_MEMORY;
1419 continue;
1422 nsCOMPtr<nsIInputStreamLength> stream =
1423 do_QueryInterface(mStreams[i].mBufferedStream);
1424 if (!stream) {
1425 // Let's use available as fallback.
1426 uint64_t streamAvail = 0;
1427 nsresult rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
1428 if (rv == NS_BASE_STREAM_CLOSED) {
1429 continue;
1432 if (NS_WARN_IF(NS_FAILED(rv))) {
1433 mStatus = rv;
1434 return mStatus;
1437 if (NS_WARN_IF(!helper->AddSize(streamAvail))) {
1438 return NS_ERROR_OUT_OF_MEMORY;
1441 continue;
1444 int64_t size = 0;
1445 nsresult rv = stream->Length(&size);
1446 if (rv == NS_BASE_STREAM_CLOSED) {
1447 continue;
1450 MOZ_ASSERT(rv != NS_BASE_STREAM_WOULD_BLOCK,
1451 "A nsILengthInutStream returns NS_BASE_STREAM_WOULD_BLOCK but "
1452 "it doesn't implement nsIAsyncInputStreamLength.");
1454 if (NS_WARN_IF(NS_FAILED(rv))) {
1455 return rv;
1458 if (size == -1) {
1459 helper->NegativeSize();
1460 break;
1463 if (NS_WARN_IF(!helper->AddSize(size))) {
1464 return NS_ERROR_OUT_OF_MEMORY;
1468 nsresult rv = helper->Proceed(this, aEventTarget, lock);
1469 if (NS_WARN_IF(NS_FAILED(rv))) {
1470 return rv;
1473 mAsyncWaitLengthHelper = helper;
1474 mAsyncWaitLengthCallback = aCallback;
1475 return NS_OK;
1478 void nsMultiplexInputStream::AsyncWaitCompleted(
1479 int64_t aLength, const MutexAutoLock& aProofOfLock) {
1480 mLock.AssertCurrentThreadOwns();
1482 nsCOMPtr<nsIInputStreamLengthCallback> callback;
1483 callback.swap(mAsyncWaitLengthCallback);
1485 mAsyncWaitLengthHelper = nullptr;
1487 // Already canceled.
1488 if (!callback) {
1489 return;
1492 MutexAutoUnlock unlock(mLock);
1493 callback->OnInputStreamLengthReady(this, aLength);
1496 #define MAYBE_UPDATE_VALUE_REAL(x, y) \
1497 if (y) { \
1498 ++x; \
1501 #define MAYBE_UPDATE_VALUE(x, y) \
1503 nsCOMPtr<y> substream = do_QueryInterface(aStream.mBufferedStream); \
1504 MAYBE_UPDATE_VALUE_REAL(x, substream) \
1507 #define MAYBE_UPDATE_BOOL(x, y) \
1508 if (!x) { \
1509 nsCOMPtr<y> substream = do_QueryInterface(aStream.mBufferedStream); \
1510 if (substream) { \
1511 x = true; \
1515 void nsMultiplexInputStream::UpdateQIMap(StreamData& aStream) {
1516 auto length = mStreams.Length();
1518 MAYBE_UPDATE_VALUE_REAL(mSeekableStreams, aStream.mSeekableStream)
1519 mIsSeekableStream = (mSeekableStreams == length);
1520 MAYBE_UPDATE_VALUE(mIPCSerializableStreams, nsIIPCSerializableInputStream)
1521 mIsIPCSerializableStream = (mIPCSerializableStreams == length);
1522 MAYBE_UPDATE_VALUE(mCloneableStreams, nsICloneableInputStream)
1523 mIsCloneableStream = (mCloneableStreams == length);
1524 // nsMultiplexInputStream is nsIAsyncInputStream if at least 1 of the
1525 // substream implements that interface
1526 if (!mIsAsyncInputStream && aStream.mAsyncStream) {
1527 mIsAsyncInputStream = true;
1529 MAYBE_UPDATE_BOOL(mIsInputStreamLength, nsIInputStreamLength)
1530 MAYBE_UPDATE_BOOL(mIsAsyncInputStreamLength, nsIAsyncInputStreamLength)
1533 #undef MAYBE_UPDATE_VALUE
1534 #undef MAYBE_UPDATE_VALUE_REAL
1535 #undef MAYBE_UPDATE_BOOL
1537 bool nsMultiplexInputStream::IsSeekable() const { return mIsSeekableStream; }
1539 bool nsMultiplexInputStream::IsIPCSerializable() const {
1540 return mIsIPCSerializableStream;
1543 bool nsMultiplexInputStream::IsCloneable() const { return mIsCloneableStream; }
1545 bool nsMultiplexInputStream::IsAsyncInputStream() const {
1546 // nsMultiplexInputStream is nsIAsyncInputStream if at least 1 of the
1547 // substream implements that interface.
1548 return mIsAsyncInputStream;
1551 bool nsMultiplexInputStream::IsInputStreamLength() const {
1552 return mIsInputStreamLength;
1555 bool nsMultiplexInputStream::IsAsyncInputStreamLength() const {
1556 return mIsAsyncInputStreamLength;