1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
8 * The multiplex stream concatenates a list of input streams into a single
12 #include "mozilla/Attributes.h"
13 #include "mozilla/CheckedInt.h"
14 #include "mozilla/MathAlgorithms.h"
15 #include "mozilla/Mutex.h"
17 #include "base/basictypes.h"
19 #include "nsMultiplexInputStream.h"
20 #include "nsIBufferedStreams.h"
21 #include "nsICloneableInputStream.h"
22 #include "nsIMultiplexInputStream.h"
23 #include "nsISeekableStream.h"
25 #include "nsCOMArray.h"
26 #include "nsIClassInfoImpl.h"
27 #include "nsIIPCSerializableInputStream.h"
28 #include "mozilla/ipc/InputStreamUtils.h"
29 #include "nsIAsyncInputStream.h"
30 #include "nsIInputStreamLength.h"
31 #include "nsNetUtil.h"
32 #include "nsStreamUtils.h"
34 using namespace mozilla
;
35 using namespace mozilla::ipc
;
37 using mozilla::DeprecatedAbs
;
39 class nsMultiplexInputStream final
: public nsIMultiplexInputStream
,
40 public nsISeekableStream
,
41 public nsIIPCSerializableInputStream
,
42 public nsICloneableInputStream
,
43 public nsIAsyncInputStream
,
44 public nsIInputStreamCallback
,
45 public nsIInputStreamLength
,
46 public nsIAsyncInputStreamLength
{
48 nsMultiplexInputStream();
50 NS_DECL_THREADSAFE_ISUPPORTS
51 NS_DECL_NSIINPUTSTREAM
52 NS_DECL_NSIMULTIPLEXINPUTSTREAM
53 NS_DECL_NSISEEKABLESTREAM
54 NS_DECL_NSITELLABLESTREAM
55 NS_DECL_NSIIPCSERIALIZABLEINPUTSTREAM
56 NS_DECL_NSICLONEABLEINPUTSTREAM
57 NS_DECL_NSIASYNCINPUTSTREAM
58 NS_DECL_NSIINPUTSTREAMCALLBACK
59 NS_DECL_NSIINPUTSTREAMLENGTH
60 NS_DECL_NSIASYNCINPUTSTREAMLENGTH
62 // This is used for nsIAsyncInputStream::AsyncWait
63 void AsyncWaitCompleted();
65 // This is used for nsIAsyncInputStreamLength::AsyncLengthWait
66 void AsyncWaitCompleted(int64_t aLength
, const MutexAutoLock
& aProofOfLock
)
70 nsresult
Initialize(nsIInputStream
* aOriginalStream
) {
73 mOriginalStream
= aOriginalStream
;
75 mBufferedStream
= aOriginalStream
;
76 if (!NS_InputStreamIsBuffered(mBufferedStream
)) {
77 nsCOMPtr
<nsIInputStream
> bufferedStream
;
78 nsresult rv
= NS_NewBufferedInputStream(getter_AddRefs(bufferedStream
),
79 mBufferedStream
.forget(), 4096);
80 NS_ENSURE_SUCCESS(rv
, rv
);
81 mBufferedStream
= bufferedStream
;
84 mAsyncStream
= do_QueryInterface(mBufferedStream
);
85 mSeekableStream
= do_QueryInterface(mBufferedStream
);
90 nsCOMPtr
<nsIInputStream
> mOriginalStream
;
92 // Equal to mOriginalStream or a wrap around the original stream to make it
94 nsCOMPtr
<nsIInputStream
> mBufferedStream
;
97 nsCOMPtr
<nsIAsyncInputStream
> mAsyncStream
;
99 nsCOMPtr
<nsISeekableStream
> mSeekableStream
;
101 uint64_t mCurrentPos
;
104 Mutex
& GetLock() MOZ_RETURN_CAPABILITY(mLock
) { return mLock
; }
107 ~nsMultiplexInputStream() = default;
109 void NextStream() MOZ_REQUIRES(mLock
) {
111 mStartedReadingCurrent
= false;
114 nsresult
AsyncWaitInternal();
116 // This method updates mSeekableStreams, mTellableStreams,
117 // mIPCSerializableStreams and mCloneableStreams values.
118 void UpdateQIMap(StreamData
& aStream
) MOZ_REQUIRES(mLock
);
120 struct MOZ_STACK_CLASS ReadSegmentsState
{
121 nsCOMPtr
<nsIInputStream
> mThisStream
;
123 nsWriteSegmentFun mWriter
;
128 void SerializedComplexityInternal(uint32_t aMaxSize
, uint32_t* aSizeUsed
,
129 uint32_t* aPipes
, uint32_t* aTransferables
,
130 bool* aSerializeAsPipe
);
132 static nsresult
ReadSegCb(nsIInputStream
* aIn
, void* aClosure
,
133 const char* aFromRawSegment
, uint32_t aToOffset
,
134 uint32_t aCount
, uint32_t* aWriteCount
);
136 bool IsSeekable() const;
137 bool IsIPCSerializable() const;
138 bool IsCloneable() const;
139 bool IsAsyncInputStream() const;
140 bool IsInputStreamLength() const;
141 bool IsAsyncInputStreamLength() const;
143 Mutex mLock
; // Protects access to all data members.
145 nsTArray
<StreamData
> mStreams
MOZ_GUARDED_BY(mLock
);
147 uint32_t mCurrentStream
MOZ_GUARDED_BY(mLock
);
148 bool mStartedReadingCurrent
MOZ_GUARDED_BY(mLock
);
149 nsresult mStatus
MOZ_GUARDED_BY(mLock
);
150 nsCOMPtr
<nsIInputStreamCallback
> mAsyncWaitCallback
MOZ_GUARDED_BY(mLock
);
151 uint32_t mAsyncWaitFlags
MOZ_GUARDED_BY(mLock
);
152 uint32_t mAsyncWaitRequestedCount
MOZ_GUARDED_BY(mLock
);
153 nsCOMPtr
<nsIEventTarget
> mAsyncWaitEventTarget
MOZ_GUARDED_BY(mLock
);
154 nsCOMPtr
<nsIInputStreamLengthCallback
> mAsyncWaitLengthCallback
155 MOZ_GUARDED_BY(mLock
);
157 class AsyncWaitLengthHelper
;
158 RefPtr
<AsyncWaitLengthHelper
> mAsyncWaitLengthHelper
MOZ_GUARDED_BY(mLock
);
160 uint32_t mSeekableStreams
MOZ_GUARDED_BY(mLock
);
161 uint32_t mIPCSerializableStreams
MOZ_GUARDED_BY(mLock
);
162 uint32_t mCloneableStreams
MOZ_GUARDED_BY(mLock
);
164 // These are Atomics so that we can check them in QueryInterface without
165 // taking a lock (to look at mStreams.Length() and the numbers above)
166 // With no streams added yet, all of these are possible
167 Atomic
<bool, Relaxed
> mIsSeekableStream
{true};
168 Atomic
<bool, Relaxed
> mIsIPCSerializableStream
{true};
169 Atomic
<bool, Relaxed
> mIsCloneableStream
{true};
171 Atomic
<bool, Relaxed
> mIsAsyncInputStream
{false};
172 Atomic
<bool, Relaxed
> mIsInputStreamLength
{false};
173 Atomic
<bool, Relaxed
> mIsAsyncInputStreamLength
{false};
176 NS_IMPL_ADDREF(nsMultiplexInputStream
)
177 NS_IMPL_RELEASE(nsMultiplexInputStream
)
179 NS_IMPL_CLASSINFO(nsMultiplexInputStream
, nullptr, nsIClassInfo::THREADSAFE
,
180 NS_MULTIPLEXINPUTSTREAM_CID
)
182 NS_INTERFACE_MAP_BEGIN(nsMultiplexInputStream
)
183 NS_INTERFACE_MAP_ENTRY(nsIMultiplexInputStream
)
184 NS_INTERFACE_MAP_ENTRY(nsIInputStream
)
185 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream
, IsSeekable())
186 NS_INTERFACE_MAP_ENTRY(nsITellableStream
)
187 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream
,
189 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream
, IsCloneable())
190 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream
, IsAsyncInputStream())
191 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback
,
192 IsAsyncInputStream())
193 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamLength
,
194 IsInputStreamLength())
195 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStreamLength
,
196 IsAsyncInputStreamLength())
197 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports
, nsIMultiplexInputStream
)
198 NS_IMPL_QUERY_CLASSINFO(nsMultiplexInputStream
)
201 NS_IMPL_CI_INTERFACE_GETTER(nsMultiplexInputStream
, nsIMultiplexInputStream
,
202 nsIInputStream
, nsISeekableStream
,
205 static nsresult
AvailableMaybeSeek(nsMultiplexInputStream::StreamData
& aStream
,
207 nsresult rv
= aStream
.mBufferedStream
->Available(aResult
);
208 if (rv
== NS_BASE_STREAM_CLOSED
) {
209 // Blindly seek to the current position if Available() returns
210 // NS_BASE_STREAM_CLOSED.
211 // If nsIFileInputStream is closed in Read() due to CLOSE_ON_EOF flag,
212 // Seek() could reopen the file if REOPEN_ON_REWIND flag is set.
213 if (aStream
.mSeekableStream
) {
215 aStream
.mSeekableStream
->Seek(nsISeekableStream::NS_SEEK_CUR
, 0);
216 if (NS_SUCCEEDED(rvSeek
)) {
217 rv
= aStream
.mBufferedStream
->Available(aResult
);
224 nsMultiplexInputStream::nsMultiplexInputStream()
225 : mLock("nsMultiplexInputStream lock"),
227 mStartedReadingCurrent(false),
230 mAsyncWaitRequestedCount(0),
232 mIPCSerializableStreams(0),
233 mCloneableStreams(0) {}
236 nsMultiplexInputStream::GetCount(uint32_t* aCount
) {
237 MutexAutoLock
lock(mLock
);
238 *aCount
= mStreams
.Length();
243 nsMultiplexInputStream::AppendStream(nsIInputStream
* aStream
) {
244 MutexAutoLock
lock(mLock
);
246 StreamData
* streamData
= mStreams
.AppendElement(fallible
);
247 if (NS_WARN_IF(!streamData
)) {
248 return NS_ERROR_OUT_OF_MEMORY
;
251 nsresult rv
= streamData
->Initialize(aStream
);
252 NS_ENSURE_SUCCESS(rv
, rv
);
254 UpdateQIMap(*streamData
);
256 if (mStatus
== NS_BASE_STREAM_CLOSED
) {
257 // We were closed, but now we have more data to read.
265 nsMultiplexInputStream::GetStream(uint32_t aIndex
, nsIInputStream
** aResult
) {
266 MutexAutoLock
lock(mLock
);
268 if (aIndex
>= mStreams
.Length()) {
269 return NS_ERROR_NOT_AVAILABLE
;
272 StreamData
& streamData
= mStreams
.ElementAt(aIndex
);
273 nsCOMPtr
<nsIInputStream
> stream
= streamData
.mOriginalStream
;
274 stream
.forget(aResult
);
279 nsMultiplexInputStream::Close() {
280 nsTArray
<nsCOMPtr
<nsIInputStream
>> streams
;
282 // Let's take a copy of the streams becuase, calling close() it could trigger
283 // a nsIInputStreamCallback immediately and we don't want to create a deadlock
286 MutexAutoLock
lock(mLock
);
287 uint32_t len
= mStreams
.Length();
288 for (uint32_t i
= 0; i
< len
; ++i
) {
290 !streams
.AppendElement(mStreams
[i
].mBufferedStream
, fallible
))) {
291 mStatus
= NS_BASE_STREAM_CLOSED
;
292 return NS_ERROR_OUT_OF_MEMORY
;
295 mStatus
= NS_BASE_STREAM_CLOSED
;
300 uint32_t len
= streams
.Length();
301 for (uint32_t i
= 0; i
< len
; ++i
) {
302 nsresult rv2
= streams
[i
]->Close();
303 // We still want to close all streams, but we should return an error
304 if (NS_FAILED(rv2
)) {
313 nsMultiplexInputStream::Available(uint64_t* aResult
) {
316 MutexAutoLock
lock(mLock
);
317 if (NS_FAILED(mStatus
)) {
322 nsresult rv
= NS_BASE_STREAM_CLOSED
;
324 uint32_t len
= mStreams
.Length();
325 for (uint32_t i
= mCurrentStream
; i
< len
; i
++) {
326 uint64_t streamAvail
;
327 rv
= AvailableMaybeSeek(mStreams
[i
], &streamAvail
);
328 if (rv
== NS_BASE_STREAM_CLOSED
) {
329 // If a stream is closed, we continue with the next one.
330 // If this is the current stream we move to the following stream.
331 if (mCurrentStream
== i
) {
335 // If this is the last stream, we want to return this error code.
339 if (NS_WARN_IF(NS_FAILED(rv
))) {
344 // If the current stream is async, we have to return what we have so far
345 // without processing the following streams. This is needed because
346 // ::Available should return only what is currently available. In case of an
347 // nsIAsyncInputStream, we have to call AsyncWait() in order to read more.
348 if (mStreams
[i
].mAsyncStream
) {
349 avail
+= streamAvail
;
353 if (streamAvail
== 0) {
354 // Nothing to read for this stream. Let's move to the next one.
358 avail
+= streamAvail
;
361 // We still have something to read. We don't want to return an error code yet.
367 // Let's propagate the last error message.
373 nsMultiplexInputStream::StreamStatus() {
374 MutexAutoLock
lock(mLock
);
379 nsMultiplexInputStream::Read(char* aBuf
, uint32_t aCount
, uint32_t* aResult
) {
380 MutexAutoLock
lock(mLock
);
381 // It is tempting to implement this method in terms of ReadSegments, but
382 // that would prevent this class from being used with streams that only
383 // implement Read (e.g., file streams).
387 if (mStatus
== NS_BASE_STREAM_CLOSED
) {
390 if (NS_FAILED(mStatus
)) {
396 uint32_t len
= mStreams
.Length();
397 while (mCurrentStream
< len
&& aCount
) {
399 rv
= mStreams
[mCurrentStream
].mBufferedStream
->Read(aBuf
, aCount
, &read
);
401 // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
402 // (This is a bug in those stream implementations)
403 if (rv
== NS_BASE_STREAM_CLOSED
) {
404 MOZ_ASSERT_UNREACHABLE(
405 "Input stream's Read method returned "
406 "NS_BASE_STREAM_CLOSED");
409 } else if (NS_FAILED(rv
)) {
416 NS_ASSERTION(aCount
>= read
, "Read more than requested");
420 mStartedReadingCurrent
= true;
422 mStreams
[mCurrentStream
].mCurrentPos
+= read
;
425 return *aResult
? NS_OK
: rv
;
429 nsMultiplexInputStream::ReadSegments(nsWriteSegmentFun aWriter
, void* aClosure
,
430 uint32_t aCount
, uint32_t* aResult
) {
431 MutexAutoLock
lock(mLock
);
433 if (mStatus
== NS_BASE_STREAM_CLOSED
) {
437 if (NS_FAILED(mStatus
)) {
441 NS_ASSERTION(aWriter
, "missing aWriter");
444 ReadSegmentsState state
;
445 state
.mThisStream
= this;
447 state
.mWriter
= aWriter
;
448 state
.mClosure
= aClosure
;
451 uint32_t len
= mStreams
.Length();
452 while (mCurrentStream
< len
&& aCount
) {
454 rv
= mStreams
[mCurrentStream
].mBufferedStream
->ReadSegments(
455 ReadSegCb
, &state
, aCount
, &read
);
457 // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
458 // (This is a bug in those stream implementations)
459 if (rv
== NS_BASE_STREAM_CLOSED
) {
460 MOZ_ASSERT_UNREACHABLE(
461 "Input stream's Read method returned "
462 "NS_BASE_STREAM_CLOSED");
467 // if |aWriter| decided to stop reading segments...
468 if (state
.mDone
|| NS_FAILED(rv
)) {
472 // if stream is empty, then advance to the next stream.
476 NS_ASSERTION(aCount
>= read
, "Read more than requested");
477 state
.mOffset
+= read
;
479 mStartedReadingCurrent
= true;
481 mStreams
[mCurrentStream
].mCurrentPos
+= read
;
485 // if we successfully read some data, then this call succeeded.
486 *aResult
= state
.mOffset
;
487 return state
.mOffset
? NS_OK
: rv
;
490 nsresult
nsMultiplexInputStream::ReadSegCb(nsIInputStream
* aIn
, void* aClosure
,
491 const char* aFromRawSegment
,
492 uint32_t aToOffset
, uint32_t aCount
,
493 uint32_t* aWriteCount
) {
495 ReadSegmentsState
* state
= (ReadSegmentsState
*)aClosure
;
496 rv
= (state
->mWriter
)(state
->mThisStream
, state
->mClosure
, aFromRawSegment
,
497 aToOffset
+ state
->mOffset
, aCount
, aWriteCount
);
505 nsMultiplexInputStream::IsNonBlocking(bool* aNonBlocking
) {
506 MutexAutoLock
lock(mLock
);
508 uint32_t len
= mStreams
.Length();
510 // Claim to be non-blocking, since we won't block the caller.
511 *aNonBlocking
= true;
515 for (uint32_t i
= 0; i
< len
; ++i
) {
516 nsresult rv
= mStreams
[i
].mBufferedStream
->IsNonBlocking(aNonBlocking
);
517 if (NS_WARN_IF(NS_FAILED(rv
))) {
520 // If one is blocking the entire stream becomes blocking.
521 if (!*aNonBlocking
) {
530 nsMultiplexInputStream::Seek(int32_t aWhence
, int64_t aOffset
) {
531 MutexAutoLock
lock(mLock
);
533 if (NS_FAILED(mStatus
)) {
539 uint32_t oldCurrentStream
= mCurrentStream
;
540 bool oldStartedReadingCurrent
= mStartedReadingCurrent
;
542 if (aWhence
== NS_SEEK_SET
) {
543 int64_t remaining
= aOffset
;
547 for (uint32_t i
= 0; i
< mStreams
.Length(); ++i
) {
548 nsCOMPtr
<nsISeekableStream
> stream
= mStreams
[i
].mSeekableStream
;
550 return NS_ERROR_FAILURE
;
553 // See if all remaining streams should be rewound
554 if (remaining
== 0) {
555 if (i
< oldCurrentStream
||
556 (i
== oldCurrentStream
&& oldStartedReadingCurrent
)) {
557 rv
= stream
->Seek(NS_SEEK_SET
, 0);
558 if (NS_WARN_IF(NS_FAILED(rv
))) {
562 mStreams
[i
].mCurrentPos
= 0;
569 // Get position in the current stream
571 if (i
> oldCurrentStream
||
572 (i
== oldCurrentStream
&& !oldStartedReadingCurrent
)) {
575 streamPos
= mStreams
[i
].mCurrentPos
;
578 // See if we need to seek the current stream forward or backward
579 if (remaining
< streamPos
) {
580 rv
= stream
->Seek(NS_SEEK_SET
, remaining
);
581 if (NS_WARN_IF(NS_FAILED(rv
))) {
585 mStreams
[i
].mCurrentPos
= remaining
;
587 mStartedReadingCurrent
= remaining
!= 0;
590 } else if (remaining
> streamPos
) {
591 if (i
< oldCurrentStream
) {
592 // We're already at end so no need to seek this stream
593 remaining
-= streamPos
;
594 NS_ASSERTION(remaining
>= 0, "Remaining invalid");
597 rv
= AvailableMaybeSeek(mStreams
[i
], &avail
);
598 if (NS_WARN_IF(NS_FAILED(rv
))) {
602 int64_t newPos
= XPCOM_MIN(remaining
, streamPos
+ (int64_t)avail
);
604 rv
= stream
->Seek(NS_SEEK_SET
, newPos
);
605 if (NS_WARN_IF(NS_FAILED(rv
))) {
609 mStreams
[i
].mCurrentPos
= newPos
;
611 mStartedReadingCurrent
= true;
614 NS_ASSERTION(remaining
>= 0, "Remaining invalid");
617 NS_ASSERTION(remaining
== streamPos
, "Huh?");
618 MOZ_ASSERT(remaining
!= 0, "Zero remaining should be handled earlier");
621 mStartedReadingCurrent
= true;
628 if (aWhence
== NS_SEEK_CUR
&& aOffset
> 0) {
629 int64_t remaining
= aOffset
;
630 for (uint32_t i
= mCurrentStream
; remaining
&& i
< mStreams
.Length(); ++i
) {
632 rv
= AvailableMaybeSeek(mStreams
[i
], &avail
);
633 if (NS_WARN_IF(NS_FAILED(rv
))) {
637 int64_t seek
= XPCOM_MIN((int64_t)avail
, remaining
);
639 rv
= mStreams
[i
].mSeekableStream
->Seek(NS_SEEK_CUR
, seek
);
640 if (NS_WARN_IF(NS_FAILED(rv
))) {
644 mStreams
[i
].mCurrentPos
+= seek
;
646 mStartedReadingCurrent
= true;
654 if (aWhence
== NS_SEEK_CUR
&& aOffset
< 0) {
655 int64_t remaining
= -aOffset
;
656 for (uint32_t i
= mCurrentStream
; remaining
&& i
!= (uint32_t)-1; --i
) {
657 int64_t pos
= mStreams
[i
].mCurrentPos
;
659 int64_t seek
= XPCOM_MIN(pos
, remaining
);
661 rv
= mStreams
[i
].mSeekableStream
->Seek(NS_SEEK_CUR
, -seek
);
662 if (NS_WARN_IF(NS_FAILED(rv
))) {
666 mStreams
[i
].mCurrentPos
-= seek
;
668 mStartedReadingCurrent
= seek
!= -pos
;
676 if (aWhence
== NS_SEEK_CUR
) {
677 NS_ASSERTION(aOffset
== 0, "Should have handled all non-zero values");
682 if (aWhence
== NS_SEEK_END
) {
684 return NS_ERROR_INVALID_ARG
;
687 int64_t remaining
= aOffset
;
689 for (i
= mStreams
.Length() - 1; i
>= 0; --i
) {
690 nsCOMPtr
<nsISeekableStream
> stream
= mStreams
[i
].mSeekableStream
;
693 rv
= AvailableMaybeSeek(mStreams
[i
], &avail
);
694 if (NS_WARN_IF(NS_FAILED(rv
))) {
698 int64_t streamLength
= avail
+ mStreams
[i
].mCurrentPos
;
700 // The seek(END) can be completed in the current stream.
701 if (streamLength
>= DeprecatedAbs(remaining
)) {
702 rv
= stream
->Seek(NS_SEEK_END
, remaining
);
703 if (NS_WARN_IF(NS_FAILED(rv
))) {
707 mStreams
[i
].mCurrentPos
= streamLength
+ remaining
;
709 mStartedReadingCurrent
= true;
713 // We are at the beginning of this stream.
714 rv
= stream
->Seek(NS_SEEK_SET
, 0);
715 if (NS_WARN_IF(NS_FAILED(rv
))) {
719 remaining
+= streamLength
;
720 mStreams
[i
].mCurrentPos
= 0;
723 // Any other stream must be set to the end.
724 for (--i
; i
>= 0; --i
) {
725 nsCOMPtr
<nsISeekableStream
> stream
= mStreams
[i
].mSeekableStream
;
728 rv
= AvailableMaybeSeek(mStreams
[i
], &avail
);
729 if (NS_WARN_IF(NS_FAILED(rv
))) {
733 int64_t streamLength
= avail
+ mStreams
[i
].mCurrentPos
;
735 rv
= stream
->Seek(NS_SEEK_END
, 0);
736 if (NS_WARN_IF(NS_FAILED(rv
))) {
740 mStreams
[i
].mCurrentPos
= streamLength
;
746 // other Seeks not implemented yet
747 return NS_ERROR_NOT_IMPLEMENTED
;
751 nsMultiplexInputStream::Tell(int64_t* aResult
) {
752 MutexAutoLock
lock(mLock
);
754 if (NS_FAILED(mStatus
)) {
760 bool zeroFound
= false;
763 for (uint32_t i
= 0; i
< mStreams
.Length(); ++i
) {
764 ret64
+= mStreams
[i
].mCurrentPos
;
767 // When we see 1 stream with currentPos = 0, all the remaining streams must
768 // be set to 0 as well.
769 MOZ_ASSERT_IF(zeroFound
, mStreams
[i
].mCurrentPos
== 0);
770 if (mStreams
[i
].mCurrentPos
== 0) {
781 nsMultiplexInputStream::SetEOF() { return NS_ERROR_NOT_IMPLEMENTED
; }
784 nsMultiplexInputStream::CloseWithStatus(nsresult aStatus
) { return Close(); }
786 // This class is used to inform nsMultiplexInputStream that it's time to execute
787 // the asyncWait callback.
788 class AsyncWaitRunnable final
: public DiscardableRunnable
{
789 RefPtr
<nsMultiplexInputStream
> mStream
;
792 static void Create(nsMultiplexInputStream
* aStream
,
793 nsIEventTarget
* aEventTarget
) {
794 RefPtr
<AsyncWaitRunnable
> runnable
= new AsyncWaitRunnable(aStream
);
796 aEventTarget
->Dispatch(runnable
.forget(), NS_DISPATCH_NORMAL
);
804 mStream
->AsyncWaitCompleted();
809 explicit AsyncWaitRunnable(nsMultiplexInputStream
* aStream
)
810 : DiscardableRunnable("AsyncWaitRunnable"), mStream(aStream
) {
816 nsMultiplexInputStream::AsyncWait(nsIInputStreamCallback
* aCallback
,
817 uint32_t aFlags
, uint32_t aRequestedCount
,
818 nsIEventTarget
* aEventTarget
) {
820 MutexAutoLock
lock(mLock
);
822 // We must execute the callback also when the stream is closed.
823 if (NS_FAILED(mStatus
) && mStatus
!= NS_BASE_STREAM_CLOSED
) {
827 if (NS_WARN_IF(mAsyncWaitCallback
&& aCallback
&&
828 mAsyncWaitCallback
!= aCallback
)) {
829 return NS_ERROR_FAILURE
;
832 mAsyncWaitCallback
= aCallback
;
833 mAsyncWaitFlags
= aFlags
;
834 mAsyncWaitRequestedCount
= aRequestedCount
;
835 mAsyncWaitEventTarget
= aEventTarget
;
838 return AsyncWaitInternal();
841 nsresult
nsMultiplexInputStream::AsyncWaitInternal() {
842 nsCOMPtr
<nsIAsyncInputStream
> stream
;
843 nsIInputStreamCallback
* asyncWaitCallback
= nullptr;
844 uint32_t asyncWaitFlags
= 0;
845 uint32_t asyncWaitRequestedCount
= 0;
846 nsCOMPtr
<nsIEventTarget
> asyncWaitEventTarget
;
849 MutexAutoLock
lock(mLock
);
851 // Let's take the first async stream if we are not already closed, and if
852 // it has data to read or if it async.
853 if (mStatus
!= NS_BASE_STREAM_CLOSED
) {
854 for (; mCurrentStream
< mStreams
.Length(); NextStream()) {
855 stream
= mStreams
[mCurrentStream
].mAsyncStream
;
861 nsresult rv
= AvailableMaybeSeek(mStreams
[mCurrentStream
], &avail
);
862 if (rv
== NS_BASE_STREAM_CLOSED
|| (NS_SUCCEEDED(rv
) && avail
== 0)) {
863 // Nothing to read here. Let's move on.
875 asyncWaitCallback
= mAsyncWaitCallback
? this : nullptr;
876 asyncWaitFlags
= mAsyncWaitFlags
;
877 asyncWaitRequestedCount
= mAsyncWaitRequestedCount
;
878 asyncWaitEventTarget
= mAsyncWaitEventTarget
;
880 MOZ_ASSERT_IF(stream
, NS_SUCCEEDED(mStatus
));
883 // If we are here it's because we are already closed, or if the current stream
884 // is not async. In both case we have to execute the callback.
886 if (asyncWaitCallback
) {
887 AsyncWaitRunnable::Create(this, asyncWaitEventTarget
);
892 return stream
->AsyncWait(asyncWaitCallback
, asyncWaitFlags
,
893 asyncWaitRequestedCount
, asyncWaitEventTarget
);
897 nsMultiplexInputStream::OnInputStreamReady(nsIAsyncInputStream
* aStream
) {
898 nsCOMPtr
<nsIInputStreamCallback
> callback
;
900 // When OnInputStreamReady is called, we could be in 2 scenarios:
901 // a. there is something to read;
902 // b. the stream is closed.
903 // But if the stream is closed and it was not the last one, we must proceed
904 // with the following stream in order to have something to read by the callee.
907 MutexAutoLock
lock(mLock
);
909 // The callback has been nullified in the meantime.
910 if (!mAsyncWaitCallback
) {
914 if (NS_SUCCEEDED(mStatus
)) {
917 // Only check `Available()` if `aStream` is actually the current stream,
918 // otherwise we'll always want to re-poll, as we got the callback for the
920 if (mCurrentStream
< mStreams
.Length() &&
921 aStream
== mStreams
[mCurrentStream
].mAsyncStream
) {
922 rv
= aStream
->Available(&avail
);
924 if (rv
== NS_BASE_STREAM_CLOSED
|| (NS_SUCCEEDED(rv
) && avail
== 0)) {
925 // This stream is either closed, has no data available, or is not the
926 // current stream. If it is closed and current, move to the next stream,
927 // otherwise re-wait on the current stream until it has data available
928 // or becomes closed.
929 // Unlike streams not implementing nsIAsyncInputStream, async streams
930 // cannot use `Available() == 0` to indicate EOF, so we re-poll in that
936 // Unlock and invoke AsyncWaitInternal to wait again. If this succeeds,
937 // we'll be called again, otherwise fall through and notify.
938 MutexAutoUnlock
unlock(mLock
);
939 if (NS_SUCCEEDED(AsyncWaitInternal())) {
945 mAsyncWaitCallback
.swap(callback
);
946 mAsyncWaitEventTarget
= nullptr;
949 return callback
? callback
->OnInputStreamReady(this) : NS_OK
;
952 void nsMultiplexInputStream::AsyncWaitCompleted() {
953 nsCOMPtr
<nsIInputStreamCallback
> callback
;
956 MutexAutoLock
lock(mLock
);
958 // The callback has been nullified in the meantime.
959 if (!mAsyncWaitCallback
) {
963 mAsyncWaitCallback
.swap(callback
);
964 mAsyncWaitEventTarget
= nullptr;
967 callback
->OnInputStreamReady(this);
970 nsresult
nsMultiplexInputStreamConstructor(REFNSIID aIID
, void** aResult
) {
973 RefPtr
<nsMultiplexInputStream
> inst
= new nsMultiplexInputStream();
975 return inst
->QueryInterface(aIID
, aResult
);
978 void nsMultiplexInputStream::SerializedComplexity(uint32_t aMaxSize
,
981 uint32_t* aTransferables
) {
982 MutexAutoLock
lock(mLock
);
983 bool serializeAsPipe
= false;
984 SerializedComplexityInternal(aMaxSize
, aSizeUsed
, aPipes
, aTransferables
,
988 void nsMultiplexInputStream::SerializedComplexityInternal(
989 uint32_t aMaxSize
, uint32_t* aSizeUsed
, uint32_t* aPipes
,
990 uint32_t* aTransferables
, bool* aSerializeAsPipe
) {
991 mLock
.AssertCurrentThreadOwns();
992 CheckedUint32 totalSizeUsed
= 0;
993 CheckedUint32 totalPipes
= 0;
994 CheckedUint32 totalTransferables
= 0;
995 CheckedUint32 maxSize
= aMaxSize
;
997 uint32_t streamCount
= mStreams
.Length();
999 for (uint32_t index
= 0; index
< streamCount
; index
++) {
1000 uint32_t sizeUsed
= 0;
1002 uint32_t transferables
= 0;
1003 InputStreamHelper::SerializedComplexity(mStreams
[index
].mOriginalStream
,
1004 maxSize
.value(), &sizeUsed
, &pipes
,
1007 MOZ_ASSERT(maxSize
.value() >= sizeUsed
);
1009 maxSize
-= sizeUsed
;
1010 MOZ_DIAGNOSTIC_ASSERT(maxSize
.isValid());
1011 totalSizeUsed
+= sizeUsed
;
1012 MOZ_DIAGNOSTIC_ASSERT(totalSizeUsed
.isValid());
1013 totalPipes
+= pipes
;
1014 MOZ_DIAGNOSTIC_ASSERT(totalPipes
.isValid());
1015 totalTransferables
+= transferables
;
1016 MOZ_DIAGNOSTIC_ASSERT(totalTransferables
.isValid());
1019 // If the combination of all streams when serialized independently is
1020 // sufficiently complex, we may choose to serialize it as a pipe to limit the
1021 // complexity of the payload.
1022 if (totalTransferables
.value() == 0) {
1023 // If there are no transferables within our serialization, and it would
1024 // contain at least one pipe, serialize the entire payload as a pipe for
1026 *aSerializeAsPipe
= totalSizeUsed
.value() > 0 && totalPipes
.value() > 0;
1028 // Otherwise, we may want to still serialize in segments to take advantage
1029 // of the efficiency of serializing transferables. We'll only serialize as a
1030 // pipe if the total attachment count exceeds kMaxAttachmentThreshold.
1031 static constexpr uint32_t kMaxAttachmentThreshold
= 8;
1032 CheckedUint32 totalAttachments
= totalPipes
+ totalTransferables
;
1033 *aSerializeAsPipe
= !totalAttachments
.isValid() ||
1034 totalAttachments
.value() > kMaxAttachmentThreshold
;
1037 if (*aSerializeAsPipe
) {
1039 nsPrintfCString("Choosing to serialize multiplex stream as a pipe "
1040 "(would be %u bytes, %u pipes, %u transferables)",
1041 totalSizeUsed
.value(), totalPipes
.value(),
1042 totalTransferables
.value())
1046 *aTransferables
= 0;
1048 *aSizeUsed
= totalSizeUsed
.value();
1049 *aPipes
= totalPipes
.value();
1050 *aTransferables
= totalTransferables
.value();
1054 void nsMultiplexInputStream::Serialize(InputStreamParams
& aParams
,
1055 uint32_t aMaxSize
, uint32_t* aSizeUsed
) {
1056 MutexAutoLock
lock(mLock
);
1058 // Check if we should serialize this stream as a pipe to reduce complexity.
1059 uint32_t dummySizeUsed
= 0, dummyPipes
= 0, dummyTransferables
= 0;
1060 bool serializeAsPipe
= false;
1061 SerializedComplexityInternal(aMaxSize
, &dummySizeUsed
, &dummyPipes
,
1062 &dummyTransferables
, &serializeAsPipe
);
1063 if (serializeAsPipe
) {
1065 MutexAutoUnlock
unlock(mLock
);
1066 InputStreamHelper::SerializeInputStreamAsPipe(this, aParams
);
1070 MultiplexInputStreamParams params
;
1072 CheckedUint32 totalSizeUsed
= 0;
1073 CheckedUint32 maxSize
= aMaxSize
;
1075 uint32_t streamCount
= mStreams
.Length();
1077 nsTArray
<InputStreamParams
>& streams
= params
.streams();
1079 streams
.SetCapacity(streamCount
);
1080 for (uint32_t index
= 0; index
< streamCount
; index
++) {
1081 uint32_t sizeUsed
= 0;
1082 InputStreamHelper::SerializeInputStream(mStreams
[index
].mOriginalStream
,
1083 *streams
.AppendElement(),
1084 maxSize
.value(), &sizeUsed
);
1086 MOZ_ASSERT(maxSize
.value() >= sizeUsed
);
1088 maxSize
-= sizeUsed
;
1089 MOZ_DIAGNOSTIC_ASSERT(maxSize
.isValid());
1091 totalSizeUsed
+= sizeUsed
;
1092 MOZ_DIAGNOSTIC_ASSERT(totalSizeUsed
.isValid());
1096 params
.currentStream() = mCurrentStream
;
1097 params
.status() = mStatus
;
1098 params
.startedReadingCurrent() = mStartedReadingCurrent
;
1100 aParams
= std::move(params
);
1102 MOZ_ASSERT(aSizeUsed
);
1103 *aSizeUsed
= totalSizeUsed
.value();
1106 bool nsMultiplexInputStream::Deserialize(const InputStreamParams
& aParams
) {
1107 if (aParams
.type() != InputStreamParams::TMultiplexInputStreamParams
) {
1108 NS_ERROR("Received unknown parameters from the other process!");
1112 const MultiplexInputStreamParams
& params
=
1113 aParams
.get_MultiplexInputStreamParams();
1115 const nsTArray
<InputStreamParams
>& streams
= params
.streams();
1117 uint32_t streamCount
= streams
.Length();
1118 for (uint32_t index
= 0; index
< streamCount
; index
++) {
1119 nsCOMPtr
<nsIInputStream
> stream
=
1120 InputStreamHelper::DeserializeInputStream(streams
[index
]);
1122 NS_WARNING("Deserialize failed!");
1126 if (NS_FAILED(AppendStream(stream
))) {
1127 NS_WARNING("AppendStream failed!");
1132 MutexAutoLock
lock(mLock
);
1133 mCurrentStream
= params
.currentStream();
1134 mStatus
= params
.status();
1135 mStartedReadingCurrent
= params
.startedReadingCurrent();
1141 nsMultiplexInputStream::GetCloneable(bool* aCloneable
) {
1142 MutexAutoLock
lock(mLock
);
1143 // XXXnsm Cloning a multiplex stream which has started reading is not
1144 // permitted right now.
1145 if (mCurrentStream
> 0 || mStartedReadingCurrent
) {
1146 *aCloneable
= false;
1150 uint32_t len
= mStreams
.Length();
1151 for (uint32_t i
= 0; i
< len
; ++i
) {
1152 nsCOMPtr
<nsICloneableInputStream
> cis
=
1153 do_QueryInterface(mStreams
[i
].mBufferedStream
);
1154 if (!cis
|| !cis
->GetCloneable()) {
1155 *aCloneable
= false;
1165 nsMultiplexInputStream::Clone(nsIInputStream
** aClone
) {
1166 MutexAutoLock
lock(mLock
);
1168 // XXXnsm Cloning a multiplex stream which has started reading is not
1169 // permitted right now.
1170 if (mCurrentStream
> 0 || mStartedReadingCurrent
) {
1171 return NS_ERROR_FAILURE
;
1174 RefPtr
<nsMultiplexInputStream
> clone
= new nsMultiplexInputStream();
1177 uint32_t len
= mStreams
.Length();
1178 for (uint32_t i
= 0; i
< len
; ++i
) {
1179 nsCOMPtr
<nsICloneableInputStream
> substream
=
1180 do_QueryInterface(mStreams
[i
].mBufferedStream
);
1181 if (NS_WARN_IF(!substream
)) {
1182 return NS_ERROR_FAILURE
;
1185 nsCOMPtr
<nsIInputStream
> clonedSubstream
;
1186 rv
= substream
->Clone(getter_AddRefs(clonedSubstream
));
1187 if (NS_WARN_IF(NS_FAILED(rv
))) {
1191 rv
= clone
->AppendStream(clonedSubstream
);
1192 if (NS_WARN_IF(NS_FAILED(rv
))) {
1197 clone
.forget(aClone
);
1202 nsMultiplexInputStream::Length(int64_t* aLength
) {
1203 MutexAutoLock
lock(mLock
);
1205 if (mCurrentStream
> 0 || mStartedReadingCurrent
) {
1206 return NS_ERROR_NOT_AVAILABLE
;
1209 CheckedInt64 length
= 0;
1210 nsresult retval
= NS_OK
;
1212 for (uint32_t i
= 0, len
= mStreams
.Length(); i
< len
; ++i
) {
1213 nsCOMPtr
<nsIInputStreamLength
> substream
=
1214 do_QueryInterface(mStreams
[i
].mBufferedStream
);
1216 // Let's use available as fallback.
1217 uint64_t streamAvail
= 0;
1218 nsresult rv
= AvailableMaybeSeek(mStreams
[i
], &streamAvail
);
1219 if (rv
== NS_BASE_STREAM_CLOSED
) {
1223 if (NS_WARN_IF(NS_FAILED(rv
))) {
1228 length
+= streamAvail
;
1229 if (!length
.isValid()) {
1230 return NS_ERROR_OUT_OF_MEMORY
;
1237 nsresult rv
= substream
->Length(&size
);
1238 if (rv
== NS_BASE_STREAM_CLOSED
) {
1242 if (rv
== NS_ERROR_NOT_AVAILABLE
) {
1246 // If one stream blocks, we all block.
1247 if (rv
!= NS_BASE_STREAM_WOULD_BLOCK
&& NS_WARN_IF(NS_FAILED(rv
))) {
1251 // We want to return WOULD_BLOCK if there is 1 stream that blocks. But want
1252 // to see if there are other streams with length = -1.
1253 if (rv
== NS_BASE_STREAM_WOULD_BLOCK
) {
1254 retval
= NS_BASE_STREAM_WOULD_BLOCK
;
1258 // If one of the stream doesn't know the size, we all don't know the size.
1265 if (!length
.isValid()) {
1266 return NS_ERROR_OUT_OF_MEMORY
;
1270 *aLength
= length
.value();
1274 class nsMultiplexInputStream::AsyncWaitLengthHelper final
1275 : public nsIInputStreamLengthCallback
{
1277 NS_DECL_THREADSAFE_ISUPPORTS
1279 AsyncWaitLengthHelper()
1280 : mStreamNotified(false), mLength(0), mNegativeSize(false) {}
1282 bool AddStream(nsIAsyncInputStreamLength
* aStream
) {
1283 return mPendingStreams
.AppendElement(aStream
, fallible
);
1286 bool AddSize(int64_t aSize
) {
1287 MOZ_ASSERT(!mNegativeSize
);
1290 return mLength
.isValid();
1293 void NegativeSize() {
1294 MOZ_ASSERT(!mNegativeSize
);
1295 mNegativeSize
= true;
1298 nsresult
Proceed(nsMultiplexInputStream
* aParentStream
,
1299 nsIEventTarget
* aEventTarget
,
1300 const MutexAutoLock
& aProofOfLock
) {
1301 MOZ_ASSERT(!mStream
);
1303 // If we don't need to wait, let's inform the callback immediately.
1304 if (mPendingStreams
.IsEmpty() || mNegativeSize
) {
1305 RefPtr
<nsMultiplexInputStream
> parentStream
= aParentStream
;
1306 int64_t length
= -1;
1307 if (!mNegativeSize
&& mLength
.isValid()) {
1308 length
= mLength
.value();
1310 nsCOMPtr
<nsIRunnable
> r
= NS_NewRunnableFunction(
1311 "AsyncWaitLengthHelper", [parentStream
, length
]() {
1312 MutexAutoLock
lock(parentStream
->GetLock());
1313 parentStream
->AsyncWaitCompleted(length
, lock
);
1315 return aEventTarget
->Dispatch(r
.forget(), NS_DISPATCH_NORMAL
);
1318 // Let's store the callback and the parent stream until we have
1319 // notifications from the async length streams.
1321 mStream
= aParentStream
;
1323 // Let's activate all the pending streams.
1324 for (nsIAsyncInputStreamLength
* stream
: mPendingStreams
) {
1325 nsresult rv
= stream
->AsyncLengthWait(this, aEventTarget
);
1326 if (rv
== NS_BASE_STREAM_CLOSED
) {
1330 if (NS_WARN_IF(NS_FAILED(rv
))) {
1339 OnInputStreamLengthReady(nsIAsyncInputStreamLength
* aStream
,
1340 int64_t aLength
) override
{
1341 MutexAutoLock
lock(mStream
->GetLock());
1343 MOZ_ASSERT(mPendingStreams
.Contains(aStream
));
1344 mPendingStreams
.RemoveElement(aStream
);
1346 // Already notified.
1347 if (mStreamNotified
) {
1351 if (aLength
== -1) {
1352 mNegativeSize
= true;
1355 if (!mLength
.isValid()) {
1356 mNegativeSize
= true;
1361 if (!mNegativeSize
&& !mPendingStreams
.IsEmpty()) {
1365 // Let's notify the parent stream.
1366 mStreamNotified
= true;
1367 mStream
->AsyncWaitCompleted(mNegativeSize
? -1 : mLength
.value(), lock
);
1372 ~AsyncWaitLengthHelper() = default;
1374 RefPtr
<nsMultiplexInputStream
> mStream
;
1375 bool mStreamNotified
;
1377 CheckedInt64 mLength
;
1380 nsTArray
<nsCOMPtr
<nsIAsyncInputStreamLength
>> mPendingStreams
;
1383 NS_IMPL_ISUPPORTS(nsMultiplexInputStream::AsyncWaitLengthHelper
,
1384 nsIInputStreamLengthCallback
)
1387 nsMultiplexInputStream::AsyncLengthWait(nsIInputStreamLengthCallback
* aCallback
,
1388 nsIEventTarget
* aEventTarget
) {
1389 if (NS_WARN_IF(!aEventTarget
)) {
1390 return NS_ERROR_NULL_POINTER
;
1393 MutexAutoLock
lock(mLock
);
1395 if (mCurrentStream
> 0 || mStartedReadingCurrent
) {
1396 return NS_ERROR_NOT_AVAILABLE
;
1400 mAsyncWaitLengthCallback
= nullptr;
1404 // We have a pending operation! Let's use this instead of creating a new one.
1405 if (mAsyncWaitLengthHelper
) {
1406 mAsyncWaitLengthCallback
= aCallback
;
1410 RefPtr
<AsyncWaitLengthHelper
> helper
= new AsyncWaitLengthHelper();
1412 for (uint32_t i
= 0, len
= mStreams
.Length(); i
< len
; ++i
) {
1413 nsCOMPtr
<nsIAsyncInputStreamLength
> asyncStream
=
1414 do_QueryInterface(mStreams
[i
].mBufferedStream
);
1416 if (NS_WARN_IF(!helper
->AddStream(asyncStream
))) {
1417 return NS_ERROR_OUT_OF_MEMORY
;
1422 nsCOMPtr
<nsIInputStreamLength
> stream
=
1423 do_QueryInterface(mStreams
[i
].mBufferedStream
);
1425 // Let's use available as fallback.
1426 uint64_t streamAvail
= 0;
1427 nsresult rv
= AvailableMaybeSeek(mStreams
[i
], &streamAvail
);
1428 if (rv
== NS_BASE_STREAM_CLOSED
) {
1432 if (NS_WARN_IF(NS_FAILED(rv
))) {
1437 if (NS_WARN_IF(!helper
->AddSize(streamAvail
))) {
1438 return NS_ERROR_OUT_OF_MEMORY
;
1445 nsresult rv
= stream
->Length(&size
);
1446 if (rv
== NS_BASE_STREAM_CLOSED
) {
1450 MOZ_ASSERT(rv
!= NS_BASE_STREAM_WOULD_BLOCK
,
1451 "A nsILengthInutStream returns NS_BASE_STREAM_WOULD_BLOCK but "
1452 "it doesn't implement nsIAsyncInputStreamLength.");
1454 if (NS_WARN_IF(NS_FAILED(rv
))) {
1459 helper
->NegativeSize();
1463 if (NS_WARN_IF(!helper
->AddSize(size
))) {
1464 return NS_ERROR_OUT_OF_MEMORY
;
1468 nsresult rv
= helper
->Proceed(this, aEventTarget
, lock
);
1469 if (NS_WARN_IF(NS_FAILED(rv
))) {
1473 mAsyncWaitLengthHelper
= helper
;
1474 mAsyncWaitLengthCallback
= aCallback
;
1478 void nsMultiplexInputStream::AsyncWaitCompleted(
1479 int64_t aLength
, const MutexAutoLock
& aProofOfLock
) {
1480 mLock
.AssertCurrentThreadOwns();
1482 nsCOMPtr
<nsIInputStreamLengthCallback
> callback
;
1483 callback
.swap(mAsyncWaitLengthCallback
);
1485 mAsyncWaitLengthHelper
= nullptr;
1487 // Already canceled.
1492 MutexAutoUnlock
unlock(mLock
);
1493 callback
->OnInputStreamLengthReady(this, aLength
);
1496 #define MAYBE_UPDATE_VALUE_REAL(x, y) \
1501 #define MAYBE_UPDATE_VALUE(x, y) \
1503 nsCOMPtr<y> substream = do_QueryInterface(aStream.mBufferedStream); \
1504 MAYBE_UPDATE_VALUE_REAL(x, substream) \
1507 #define MAYBE_UPDATE_BOOL(x, y) \
1509 nsCOMPtr<y> substream = do_QueryInterface(aStream.mBufferedStream); \
1515 void nsMultiplexInputStream::UpdateQIMap(StreamData
& aStream
) {
1516 auto length
= mStreams
.Length();
1518 MAYBE_UPDATE_VALUE_REAL(mSeekableStreams
, aStream
.mSeekableStream
)
1519 mIsSeekableStream
= (mSeekableStreams
== length
);
1520 MAYBE_UPDATE_VALUE(mIPCSerializableStreams
, nsIIPCSerializableInputStream
)
1521 mIsIPCSerializableStream
= (mIPCSerializableStreams
== length
);
1522 MAYBE_UPDATE_VALUE(mCloneableStreams
, nsICloneableInputStream
)
1523 mIsCloneableStream
= (mCloneableStreams
== length
);
1524 // nsMultiplexInputStream is nsIAsyncInputStream if at least 1 of the
1525 // substream implements that interface
1526 if (!mIsAsyncInputStream
&& aStream
.mAsyncStream
) {
1527 mIsAsyncInputStream
= true;
1529 MAYBE_UPDATE_BOOL(mIsInputStreamLength
, nsIInputStreamLength
)
1530 MAYBE_UPDATE_BOOL(mIsAsyncInputStreamLength
, nsIAsyncInputStreamLength
)
1533 #undef MAYBE_UPDATE_VALUE
1534 #undef MAYBE_UPDATE_VALUE_REAL
1535 #undef MAYBE_UPDATE_BOOL
1537 bool nsMultiplexInputStream::IsSeekable() const { return mIsSeekableStream
; }
1539 bool nsMultiplexInputStream::IsIPCSerializable() const {
1540 return mIsIPCSerializableStream
;
1543 bool nsMultiplexInputStream::IsCloneable() const { return mIsCloneableStream
; }
1545 bool nsMultiplexInputStream::IsAsyncInputStream() const {
1546 // nsMultiplexInputStream is nsIAsyncInputStream if at least 1 of the
1547 // substream implements that interface.
1548 return mIsAsyncInputStream
;
1551 bool nsMultiplexInputStream::IsInputStreamLength() const {
1552 return mIsInputStreamLength
;
1555 bool nsMultiplexInputStream::IsAsyncInputStreamLength() const {
1556 return mIsAsyncInputStreamLength
;