1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
8 * The multiplex stream concatenates a list of input streams into a single
12 #include "mozilla/Attributes.h"
13 #include "mozilla/CheckedInt.h"
14 #include "mozilla/MathAlgorithms.h"
15 #include "mozilla/Mutex.h"
17 #include "base/basictypes.h"
19 #include "nsMultiplexInputStream.h"
20 #include "nsIBufferedStreams.h"
21 #include "nsICloneableInputStream.h"
22 #include "nsIMultiplexInputStream.h"
23 #include "nsISeekableStream.h"
25 #include "nsCOMArray.h"
26 #include "nsIClassInfoImpl.h"
27 #include "nsIIPCSerializableInputStream.h"
28 #include "mozilla/ipc/InputStreamUtils.h"
29 #include "nsIAsyncInputStream.h"
30 #include "nsIInputStreamLength.h"
31 #include "nsNetUtil.h"
32 #include "nsStreamUtils.h"
34 using namespace mozilla
;
35 using namespace mozilla::ipc
;
37 using mozilla::DeprecatedAbs
;
39 using mozilla::Nothing
;
42 class nsMultiplexInputStream final
: public nsIMultiplexInputStream
,
43 public nsISeekableStream
,
44 public nsIIPCSerializableInputStream
,
45 public nsICloneableInputStream
,
46 public nsIAsyncInputStream
,
47 public nsIInputStreamCallback
,
48 public nsIInputStreamLength
,
49 public nsIAsyncInputStreamLength
{
51 nsMultiplexInputStream();
53 NS_DECL_THREADSAFE_ISUPPORTS
54 NS_DECL_NSIINPUTSTREAM
55 NS_DECL_NSIMULTIPLEXINPUTSTREAM
56 NS_DECL_NSISEEKABLESTREAM
57 NS_DECL_NSITELLABLESTREAM
58 NS_DECL_NSIIPCSERIALIZABLEINPUTSTREAM
59 NS_DECL_NSICLONEABLEINPUTSTREAM
60 NS_DECL_NSIASYNCINPUTSTREAM
61 NS_DECL_NSIINPUTSTREAMCALLBACK
62 NS_DECL_NSIINPUTSTREAMLENGTH
63 NS_DECL_NSIASYNCINPUTSTREAMLENGTH
65 // This is used for nsIAsyncInputStream::AsyncWait
66 void AsyncWaitCompleted();
68 // This is used for nsIAsyncInputStreamLength::AsyncLengthWait
69 void AsyncWaitCompleted(int64_t aLength
, const MutexAutoLock
& aProofOfLock
);
72 nsresult
Initialize(nsIInputStream
* aOriginalStream
) {
75 mOriginalStream
= aOriginalStream
;
77 mBufferedStream
= aOriginalStream
;
78 if (!NS_InputStreamIsBuffered(mBufferedStream
)) {
79 nsCOMPtr
<nsIInputStream
> bufferedStream
;
80 nsresult rv
= NS_NewBufferedInputStream(getter_AddRefs(bufferedStream
),
81 mBufferedStream
.forget(), 4096);
82 NS_ENSURE_SUCCESS(rv
, rv
);
83 mBufferedStream
= bufferedStream
;
86 mAsyncStream
= do_QueryInterface(mBufferedStream
);
87 mSeekableStream
= do_QueryInterface(mBufferedStream
);
92 nsCOMPtr
<nsIInputStream
> mOriginalStream
;
94 // Equal to mOriginalStream or a wrap around the original stream to make it
96 nsCOMPtr
<nsIInputStream
> mBufferedStream
;
99 nsCOMPtr
<nsIAsyncInputStream
> mAsyncStream
;
101 nsCOMPtr
<nsISeekableStream
> mSeekableStream
;
103 uint64_t mCurrentPos
;
106 Mutex
& GetLock() { return mLock
; }
109 ~nsMultiplexInputStream() = default;
111 nsresult
AsyncWaitInternal();
113 // This method updates mSeekableStreams, mTellableStreams,
114 // mIPCSerializableStreams and mCloneableStreams values.
115 void UpdateQIMap(StreamData
& aStream
);
117 struct MOZ_STACK_CLASS ReadSegmentsState
{
118 nsCOMPtr
<nsIInputStream
> mThisStream
;
120 nsWriteSegmentFun mWriter
;
125 template <typename M
>
126 void SerializeInternal(InputStreamParams
& aParams
,
127 FileDescriptorArray
& aFileDescriptors
,
128 bool aDelayedStart
, uint32_t aMaxSize
,
129 uint32_t* aSizeUsed
, M
* aManager
);
131 static nsresult
ReadSegCb(nsIInputStream
* aIn
, void* aClosure
,
132 const char* aFromRawSegment
, uint32_t aToOffset
,
133 uint32_t aCount
, uint32_t* aWriteCount
);
135 bool IsSeekable() const;
136 bool IsIPCSerializable() const;
137 bool IsCloneable() const;
138 bool IsAsyncInputStream() const;
139 bool IsInputStreamLength() const;
140 bool IsAsyncInputStreamLength() const;
142 Mutex mLock
; // Protects access to all data members.
144 nsTArray
<StreamData
> mStreams
;
146 uint32_t mCurrentStream
;
147 bool mStartedReadingCurrent
;
149 nsCOMPtr
<nsIInputStreamCallback
> mAsyncWaitCallback
;
150 uint32_t mAsyncWaitFlags
;
151 uint32_t mAsyncWaitRequestedCount
;
152 nsCOMPtr
<nsIEventTarget
> mAsyncWaitEventTarget
;
153 nsCOMPtr
<nsIInputStreamLengthCallback
> mAsyncWaitLengthCallback
;
155 class AsyncWaitLengthHelper
;
156 RefPtr
<AsyncWaitLengthHelper
> mAsyncWaitLengthHelper
;
158 uint32_t mSeekableStreams
;
159 uint32_t mIPCSerializableStreams
;
160 uint32_t mCloneableStreams
;
161 uint32_t mAsyncInputStreams
;
162 uint32_t mInputStreamLengths
;
163 uint32_t mAsyncInputStreamLengths
;
166 NS_IMPL_ADDREF(nsMultiplexInputStream
)
167 NS_IMPL_RELEASE(nsMultiplexInputStream
)
169 NS_IMPL_CLASSINFO(nsMultiplexInputStream
, nullptr, nsIClassInfo::THREADSAFE
,
170 NS_MULTIPLEXINPUTSTREAM_CID
)
172 NS_INTERFACE_MAP_BEGIN(nsMultiplexInputStream
)
173 NS_INTERFACE_MAP_ENTRY(nsIMultiplexInputStream
)
174 NS_INTERFACE_MAP_ENTRY(nsIInputStream
)
175 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream
, IsSeekable())
176 NS_INTERFACE_MAP_ENTRY(nsITellableStream
)
177 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream
,
179 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream
, IsCloneable())
180 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream
, IsAsyncInputStream())
181 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback
,
182 IsAsyncInputStream())
183 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamLength
,
184 IsInputStreamLength())
185 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStreamLength
,
186 IsAsyncInputStreamLength())
187 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports
, nsIMultiplexInputStream
)
188 NS_IMPL_QUERY_CLASSINFO(nsMultiplexInputStream
)
191 NS_IMPL_CI_INTERFACE_GETTER(nsMultiplexInputStream
, nsIMultiplexInputStream
,
192 nsIInputStream
, nsISeekableStream
,
195 static nsresult
AvailableMaybeSeek(nsMultiplexInputStream::StreamData
& aStream
,
197 nsresult rv
= aStream
.mBufferedStream
->Available(aResult
);
198 if (rv
== NS_BASE_STREAM_CLOSED
) {
199 // Blindly seek to the current position if Available() returns
200 // NS_BASE_STREAM_CLOSED.
201 // If nsIFileInputStream is closed in Read() due to CLOSE_ON_EOF flag,
202 // Seek() could reopen the file if REOPEN_ON_REWIND flag is set.
203 if (aStream
.mSeekableStream
) {
205 aStream
.mSeekableStream
->Seek(nsISeekableStream::NS_SEEK_CUR
, 0);
206 if (NS_SUCCEEDED(rvSeek
)) {
207 rv
= aStream
.mBufferedStream
->Available(aResult
);
214 nsMultiplexInputStream::nsMultiplexInputStream()
215 : mLock("nsMultiplexInputStream lock"),
217 mStartedReadingCurrent(false),
220 mAsyncWaitRequestedCount(0),
222 mIPCSerializableStreams(0),
223 mCloneableStreams(0),
224 mAsyncInputStreams(0),
225 mInputStreamLengths(0),
226 mAsyncInputStreamLengths(0) {}
229 nsMultiplexInputStream::GetCount(uint32_t* aCount
) {
230 MutexAutoLock
lock(mLock
);
231 *aCount
= mStreams
.Length();
236 nsMultiplexInputStream::AppendStream(nsIInputStream
* aStream
) {
237 MutexAutoLock
lock(mLock
);
239 StreamData
* streamData
= mStreams
.AppendElement(fallible
);
240 if (NS_WARN_IF(!streamData
)) {
241 return NS_ERROR_OUT_OF_MEMORY
;
244 nsresult rv
= streamData
->Initialize(aStream
);
245 NS_ENSURE_SUCCESS(rv
, rv
);
247 UpdateQIMap(*streamData
);
249 if (mStatus
== NS_BASE_STREAM_CLOSED
) {
250 // We were closed, but now we have more data to read.
258 nsMultiplexInputStream::GetStream(uint32_t aIndex
, nsIInputStream
** aResult
) {
259 MutexAutoLock
lock(mLock
);
261 if (aIndex
>= mStreams
.Length()) {
262 return NS_ERROR_NOT_AVAILABLE
;
265 StreamData
& streamData
= mStreams
.ElementAt(aIndex
);
266 nsCOMPtr
<nsIInputStream
> stream
= streamData
.mOriginalStream
;
267 stream
.forget(aResult
);
272 nsMultiplexInputStream::Close() {
273 nsTArray
<nsCOMPtr
<nsIInputStream
>> streams
;
275 // Let's take a copy of the streams becuase, calling close() it could trigger
276 // a nsIInputStreamCallback immediately and we don't want to create a deadlock
279 MutexAutoLock
lock(mLock
);
280 uint32_t len
= mStreams
.Length();
281 for (uint32_t i
= 0; i
< len
; ++i
) {
283 !streams
.AppendElement(mStreams
[i
].mBufferedStream
, fallible
))) {
284 mStatus
= NS_BASE_STREAM_CLOSED
;
285 return NS_ERROR_OUT_OF_MEMORY
;
288 mStatus
= NS_BASE_STREAM_CLOSED
;
293 uint32_t len
= streams
.Length();
294 for (uint32_t i
= 0; i
< len
; ++i
) {
295 nsresult rv2
= streams
[i
]->Close();
296 // We still want to close all streams, but we should return an error
297 if (NS_FAILED(rv2
)) {
306 nsMultiplexInputStream::Available(uint64_t* aResult
) {
309 MutexAutoLock
lock(mLock
);
310 if (NS_FAILED(mStatus
)) {
315 nsresult rv
= NS_BASE_STREAM_CLOSED
;
317 uint32_t len
= mStreams
.Length();
318 for (uint32_t i
= mCurrentStream
; i
< len
; i
++) {
319 uint64_t streamAvail
;
320 rv
= AvailableMaybeSeek(mStreams
[i
], &streamAvail
);
321 if (rv
== NS_BASE_STREAM_CLOSED
) {
322 // If a stream is closed, we continue with the next one.
323 // If this is the current stream we move to the following stream.
324 if (mCurrentStream
== i
) {
328 // If this is the last stream, we want to return this error code.
332 if (NS_WARN_IF(NS_FAILED(rv
))) {
337 // If the current stream is async, we have to return what we have so far
338 // without processing the following streams. This is needed because
339 // ::Available should return only what is currently available. In case of an
340 // nsIAsyncInputStream, we have to call AsyncWait() in order to read more.
341 if (mStreams
[i
].mAsyncStream
) {
342 avail
+= streamAvail
;
346 if (streamAvail
== 0) {
347 // Nothing to read for this stream. Let's move to the next one.
351 avail
+= streamAvail
;
354 // We still have something to read. We don't want to return an error code yet.
360 // Let's propagate the last error message.
366 nsMultiplexInputStream::Read(char* aBuf
, uint32_t aCount
, uint32_t* aResult
) {
367 MutexAutoLock
lock(mLock
);
368 // It is tempting to implement this method in terms of ReadSegments, but
369 // that would prevent this class from being used with streams that only
370 // implement Read (e.g., file streams).
374 if (mStatus
== NS_BASE_STREAM_CLOSED
) {
377 if (NS_FAILED(mStatus
)) {
383 uint32_t len
= mStreams
.Length();
384 while (mCurrentStream
< len
&& aCount
) {
386 rv
= mStreams
[mCurrentStream
].mBufferedStream
->Read(aBuf
, aCount
, &read
);
388 // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
389 // (This is a bug in those stream implementations)
390 if (rv
== NS_BASE_STREAM_CLOSED
) {
391 MOZ_ASSERT_UNREACHABLE(
392 "Input stream's Read method returned "
393 "NS_BASE_STREAM_CLOSED");
396 } else if (NS_FAILED(rv
)) {
402 mStartedReadingCurrent
= false;
404 NS_ASSERTION(aCount
>= read
, "Read more than requested");
408 mStartedReadingCurrent
= true;
410 mStreams
[mCurrentStream
].mCurrentPos
+= read
;
413 return *aResult
? NS_OK
: rv
;
417 nsMultiplexInputStream::ReadSegments(nsWriteSegmentFun aWriter
, void* aClosure
,
418 uint32_t aCount
, uint32_t* aResult
) {
419 MutexAutoLock
lock(mLock
);
421 if (mStatus
== NS_BASE_STREAM_CLOSED
) {
425 if (NS_FAILED(mStatus
)) {
429 NS_ASSERTION(aWriter
, "missing aWriter");
432 ReadSegmentsState state
;
433 state
.mThisStream
= this;
435 state
.mWriter
= aWriter
;
436 state
.mClosure
= aClosure
;
439 uint32_t len
= mStreams
.Length();
440 while (mCurrentStream
< len
&& aCount
) {
442 rv
= mStreams
[mCurrentStream
].mBufferedStream
->ReadSegments(
443 ReadSegCb
, &state
, aCount
, &read
);
445 // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
446 // (This is a bug in those stream implementations)
447 if (rv
== NS_BASE_STREAM_CLOSED
) {
448 MOZ_ASSERT_UNREACHABLE(
449 "Input stream's Read method returned "
450 "NS_BASE_STREAM_CLOSED");
455 // if |aWriter| decided to stop reading segments...
456 if (state
.mDone
|| NS_FAILED(rv
)) {
460 // if stream is empty, then advance to the next stream.
463 mStartedReadingCurrent
= false;
465 NS_ASSERTION(aCount
>= read
, "Read more than requested");
466 state
.mOffset
+= read
;
468 mStartedReadingCurrent
= true;
470 mStreams
[mCurrentStream
].mCurrentPos
+= read
;
474 // if we successfully read some data, then this call succeeded.
475 *aResult
= state
.mOffset
;
476 return state
.mOffset
? NS_OK
: rv
;
479 nsresult
nsMultiplexInputStream::ReadSegCb(nsIInputStream
* aIn
, void* aClosure
,
480 const char* aFromRawSegment
,
481 uint32_t aToOffset
, uint32_t aCount
,
482 uint32_t* aWriteCount
) {
484 ReadSegmentsState
* state
= (ReadSegmentsState
*)aClosure
;
485 rv
= (state
->mWriter
)(state
->mThisStream
, state
->mClosure
, aFromRawSegment
,
486 aToOffset
+ state
->mOffset
, aCount
, aWriteCount
);
494 nsMultiplexInputStream::IsNonBlocking(bool* aNonBlocking
) {
495 MutexAutoLock
lock(mLock
);
497 uint32_t len
= mStreams
.Length();
499 // Claim to be non-blocking, since we won't block the caller.
500 *aNonBlocking
= true;
504 for (uint32_t i
= 0; i
< len
; ++i
) {
505 nsresult rv
= mStreams
[i
].mBufferedStream
->IsNonBlocking(aNonBlocking
);
506 if (NS_WARN_IF(NS_FAILED(rv
))) {
509 // If one is blocking the entire stream becomes blocking.
510 if (!*aNonBlocking
) {
519 nsMultiplexInputStream::Seek(int32_t aWhence
, int64_t aOffset
) {
520 MutexAutoLock
lock(mLock
);
522 if (NS_FAILED(mStatus
)) {
528 uint32_t oldCurrentStream
= mCurrentStream
;
529 bool oldStartedReadingCurrent
= mStartedReadingCurrent
;
531 if (aWhence
== NS_SEEK_SET
) {
532 int64_t remaining
= aOffset
;
536 for (uint32_t i
= 0; i
< mStreams
.Length(); ++i
) {
537 nsCOMPtr
<nsISeekableStream
> stream
= mStreams
[i
].mSeekableStream
;
539 return NS_ERROR_FAILURE
;
542 // See if all remaining streams should be rewound
543 if (remaining
== 0) {
544 if (i
< oldCurrentStream
||
545 (i
== oldCurrentStream
&& oldStartedReadingCurrent
)) {
546 rv
= stream
->Seek(NS_SEEK_SET
, 0);
547 if (NS_WARN_IF(NS_FAILED(rv
))) {
551 mStreams
[i
].mCurrentPos
= 0;
558 // Get position in the current stream
560 if (i
> oldCurrentStream
||
561 (i
== oldCurrentStream
&& !oldStartedReadingCurrent
)) {
564 streamPos
= mStreams
[i
].mCurrentPos
;
567 // See if we need to seek the current stream forward or backward
568 if (remaining
< streamPos
) {
569 rv
= stream
->Seek(NS_SEEK_SET
, remaining
);
570 if (NS_WARN_IF(NS_FAILED(rv
))) {
574 mStreams
[i
].mCurrentPos
= remaining
;
576 mStartedReadingCurrent
= remaining
!= 0;
579 } else if (remaining
> streamPos
) {
580 if (i
< oldCurrentStream
) {
581 // We're already at end so no need to seek this stream
582 remaining
-= streamPos
;
583 NS_ASSERTION(remaining
>= 0, "Remaining invalid");
586 rv
= AvailableMaybeSeek(mStreams
[i
], &avail
);
587 if (NS_WARN_IF(NS_FAILED(rv
))) {
591 int64_t newPos
= XPCOM_MIN(remaining
, streamPos
+ (int64_t)avail
);
593 rv
= stream
->Seek(NS_SEEK_SET
, newPos
);
594 if (NS_WARN_IF(NS_FAILED(rv
))) {
598 mStreams
[i
].mCurrentPos
= newPos
;
600 mStartedReadingCurrent
= true;
603 NS_ASSERTION(remaining
>= 0, "Remaining invalid");
606 NS_ASSERTION(remaining
== streamPos
, "Huh?");
607 MOZ_ASSERT(remaining
!= 0, "Zero remaining should be handled earlier");
610 mStartedReadingCurrent
= true;
617 if (aWhence
== NS_SEEK_CUR
&& aOffset
> 0) {
618 int64_t remaining
= aOffset
;
619 for (uint32_t i
= mCurrentStream
; remaining
&& i
< mStreams
.Length(); ++i
) {
621 rv
= AvailableMaybeSeek(mStreams
[i
], &avail
);
622 if (NS_WARN_IF(NS_FAILED(rv
))) {
626 int64_t seek
= XPCOM_MIN((int64_t)avail
, remaining
);
628 rv
= mStreams
[i
].mSeekableStream
->Seek(NS_SEEK_CUR
, seek
);
629 if (NS_WARN_IF(NS_FAILED(rv
))) {
633 mStreams
[i
].mCurrentPos
+= seek
;
635 mStartedReadingCurrent
= true;
643 if (aWhence
== NS_SEEK_CUR
&& aOffset
< 0) {
644 int64_t remaining
= -aOffset
;
645 for (uint32_t i
= mCurrentStream
; remaining
&& i
!= (uint32_t)-1; --i
) {
646 int64_t pos
= mStreams
[i
].mCurrentPos
;
648 int64_t seek
= XPCOM_MIN(pos
, remaining
);
650 rv
= mStreams
[i
].mSeekableStream
->Seek(NS_SEEK_CUR
, -seek
);
651 if (NS_WARN_IF(NS_FAILED(rv
))) {
655 mStreams
[i
].mCurrentPos
-= seek
;
657 mStartedReadingCurrent
= seek
!= -pos
;
665 if (aWhence
== NS_SEEK_CUR
) {
666 NS_ASSERTION(aOffset
== 0, "Should have handled all non-zero values");
671 if (aWhence
== NS_SEEK_END
) {
673 return NS_ERROR_INVALID_ARG
;
676 int64_t remaining
= aOffset
;
678 for (i
= mStreams
.Length() - 1; i
>= 0; --i
) {
679 nsCOMPtr
<nsISeekableStream
> stream
= mStreams
[i
].mSeekableStream
;
682 rv
= AvailableMaybeSeek(mStreams
[i
], &avail
);
683 if (NS_WARN_IF(NS_FAILED(rv
))) {
687 int64_t streamLength
= avail
+ mStreams
[i
].mCurrentPos
;
689 // The seek(END) can be completed in the current stream.
690 if (streamLength
>= DeprecatedAbs(remaining
)) {
691 rv
= stream
->Seek(NS_SEEK_END
, remaining
);
692 if (NS_WARN_IF(NS_FAILED(rv
))) {
696 mStreams
[i
].mCurrentPos
= streamLength
+ remaining
;
698 mStartedReadingCurrent
= true;
702 // We are at the beginning of this stream.
703 rv
= stream
->Seek(NS_SEEK_SET
, 0);
704 if (NS_WARN_IF(NS_FAILED(rv
))) {
708 remaining
+= streamLength
;
709 mStreams
[i
].mCurrentPos
= 0;
712 // Any other stream must be set to the end.
713 for (--i
; i
>= 0; --i
) {
714 nsCOMPtr
<nsISeekableStream
> stream
= mStreams
[i
].mSeekableStream
;
717 rv
= AvailableMaybeSeek(mStreams
[i
], &avail
);
718 if (NS_WARN_IF(NS_FAILED(rv
))) {
722 int64_t streamLength
= avail
+ mStreams
[i
].mCurrentPos
;
724 rv
= stream
->Seek(NS_SEEK_END
, 0);
725 if (NS_WARN_IF(NS_FAILED(rv
))) {
729 mStreams
[i
].mCurrentPos
= streamLength
;
735 // other Seeks not implemented yet
736 return NS_ERROR_NOT_IMPLEMENTED
;
740 nsMultiplexInputStream::Tell(int64_t* aResult
) {
741 MutexAutoLock
lock(mLock
);
743 if (NS_FAILED(mStatus
)) {
749 bool zeroFound
= false;
752 for (uint32_t i
= 0; i
< mStreams
.Length(); ++i
) {
753 ret64
+= mStreams
[i
].mCurrentPos
;
756 // When we see 1 stream with currentPos = 0, all the remaining streams must
757 // be set to 0 as well.
758 MOZ_ASSERT_IF(zeroFound
, mStreams
[i
].mCurrentPos
== 0);
759 if (mStreams
[i
].mCurrentPos
== 0) {
770 nsMultiplexInputStream::SetEOF() { return NS_ERROR_NOT_IMPLEMENTED
; }
773 nsMultiplexInputStream::CloseWithStatus(nsresult aStatus
) { return Close(); }
775 // This class is used to inform nsMultiplexInputStream that it's time to execute
776 // the asyncWait callback.
777 class AsyncWaitRunnable final
: public DiscardableRunnable
{
778 RefPtr
<nsMultiplexInputStream
> mStream
;
781 static void Create(nsMultiplexInputStream
* aStream
,
782 nsIEventTarget
* aEventTarget
) {
783 RefPtr
<AsyncWaitRunnable
> runnable
= new AsyncWaitRunnable(aStream
);
785 aEventTarget
->Dispatch(runnable
.forget(), NS_DISPATCH_NORMAL
);
793 mStream
->AsyncWaitCompleted();
798 explicit AsyncWaitRunnable(nsMultiplexInputStream
* aStream
)
799 : DiscardableRunnable("AsyncWaitRunnable"), mStream(aStream
) {
805 nsMultiplexInputStream::AsyncWait(nsIInputStreamCallback
* aCallback
,
806 uint32_t aFlags
, uint32_t aRequestedCount
,
807 nsIEventTarget
* aEventTarget
) {
809 MutexAutoLock
lock(mLock
);
811 // We must execute the callback also when the stream is closed.
812 if (NS_FAILED(mStatus
) && mStatus
!= NS_BASE_STREAM_CLOSED
) {
816 if (mAsyncWaitCallback
&& aCallback
) {
817 return NS_ERROR_FAILURE
;
820 mAsyncWaitCallback
= aCallback
;
821 mAsyncWaitFlags
= aFlags
;
822 mAsyncWaitRequestedCount
= aRequestedCount
;
823 mAsyncWaitEventTarget
= aEventTarget
;
825 if (!mAsyncWaitCallback
) {
830 return AsyncWaitInternal();
833 nsresult
nsMultiplexInputStream::AsyncWaitInternal() {
834 nsCOMPtr
<nsIAsyncInputStream
> stream
;
835 uint32_t asyncWaitFlags
= 0;
836 uint32_t asyncWaitRequestedCount
= 0;
837 nsCOMPtr
<nsIEventTarget
> asyncWaitEventTarget
;
840 MutexAutoLock
lock(mLock
);
842 // Let's take the first async stream if we are not already closed, and if
843 // it has data to read or if it async.
844 if (mStatus
!= NS_BASE_STREAM_CLOSED
) {
845 for (; mCurrentStream
< mStreams
.Length(); ++mCurrentStream
) {
846 stream
= mStreams
[mCurrentStream
].mAsyncStream
;
852 nsresult rv
= AvailableMaybeSeek(mStreams
[mCurrentStream
], &avail
);
853 if (rv
== NS_BASE_STREAM_CLOSED
|| (NS_SUCCEEDED(rv
) && avail
== 0)) {
854 // Nothing to read here. Let's move on.
866 asyncWaitFlags
= mAsyncWaitFlags
;
867 asyncWaitRequestedCount
= mAsyncWaitRequestedCount
;
868 asyncWaitEventTarget
= mAsyncWaitEventTarget
;
871 MOZ_ASSERT_IF(stream
, NS_SUCCEEDED(mStatus
));
873 // If we are here it's because we are already closed, or if the current stream
874 // is not async. In both case we have to execute the callback.
876 AsyncWaitRunnable::Create(this, asyncWaitEventTarget
);
880 return stream
->AsyncWait(this, asyncWaitFlags
, asyncWaitRequestedCount
,
881 asyncWaitEventTarget
);
885 nsMultiplexInputStream::OnInputStreamReady(nsIAsyncInputStream
* aStream
) {
886 nsCOMPtr
<nsIInputStreamCallback
> callback
;
888 // When OnInputStreamReady is called, we could be in 2 scenarios:
889 // a. there is something to read;
890 // b. the stream is closed.
891 // But if the stream is closed and it was not the last one, we must proceed
892 // with the following stream in order to have something to read by the callee.
895 MutexAutoLock
lock(mLock
);
897 // The callback has been nullified in the meantime.
898 if (!mAsyncWaitCallback
) {
902 if (NS_SUCCEEDED(mStatus
)) {
904 nsresult rv
= aStream
->Available(&avail
);
905 if (rv
== NS_BASE_STREAM_CLOSED
|| avail
== 0) {
906 // This stream is closed or empty, let's move to the following one.
908 MutexAutoUnlock
unlock(mLock
);
909 return AsyncWaitInternal();
913 mAsyncWaitCallback
.swap(callback
);
914 mAsyncWaitEventTarget
= nullptr;
917 return callback
->OnInputStreamReady(this);
920 void nsMultiplexInputStream::AsyncWaitCompleted() {
921 nsCOMPtr
<nsIInputStreamCallback
> callback
;
924 MutexAutoLock
lock(mLock
);
926 // The callback has been nullified in the meantime.
927 if (!mAsyncWaitCallback
) {
931 mAsyncWaitCallback
.swap(callback
);
932 mAsyncWaitEventTarget
= nullptr;
935 callback
->OnInputStreamReady(this);
938 nsresult
nsMultiplexInputStreamConstructor(nsISupports
* aOuter
, REFNSIID aIID
,
943 return NS_ERROR_NO_AGGREGATION
;
946 RefPtr
<nsMultiplexInputStream
> inst
= new nsMultiplexInputStream();
948 return inst
->QueryInterface(aIID
, aResult
);
951 void nsMultiplexInputStream::Serialize(
952 InputStreamParams
& aParams
, FileDescriptorArray
& aFileDescriptors
,
953 bool aDelayedStart
, uint32_t aMaxSize
, uint32_t* aSizeUsed
,
954 mozilla::ipc::ParentToChildStreamActorManager
* aManager
) {
955 SerializeInternal(aParams
, aFileDescriptors
, aDelayedStart
, aMaxSize
,
956 aSizeUsed
, aManager
);
959 void nsMultiplexInputStream::Serialize(
960 InputStreamParams
& aParams
, FileDescriptorArray
& aFileDescriptors
,
961 bool aDelayedStart
, uint32_t aMaxSize
, uint32_t* aSizeUsed
,
962 mozilla::ipc::ChildToParentStreamActorManager
* aManager
) {
963 SerializeInternal(aParams
, aFileDescriptors
, aDelayedStart
, aMaxSize
,
964 aSizeUsed
, aManager
);
967 template <typename M
>
968 void nsMultiplexInputStream::SerializeInternal(
969 InputStreamParams
& aParams
, FileDescriptorArray
& aFileDescriptors
,
970 bool aDelayedStart
, uint32_t aMaxSize
, uint32_t* aSizeUsed
, M
* aManager
) {
971 MutexAutoLock
lock(mLock
);
973 MultiplexInputStreamParams params
;
975 CheckedUint32 totalSizeUsed
= 0;
976 CheckedUint32 maxSize
= aMaxSize
;
978 uint32_t streamCount
= mStreams
.Length();
980 nsTArray
<InputStreamParams
>& streams
= params
.streams();
982 streams
.SetCapacity(streamCount
);
983 for (uint32_t index
= 0; index
< streamCount
; index
++) {
984 uint32_t sizeUsed
= 0;
985 InputStreamHelper::SerializeInputStream(
986 mStreams
[index
].mOriginalStream
, *streams
.AppendElement(),
987 aFileDescriptors
, aDelayedStart
, maxSize
.value(), &sizeUsed
,
990 MOZ_ASSERT(maxSize
.value() >= sizeUsed
);
993 MOZ_DIAGNOSTIC_ASSERT(maxSize
.isValid());
995 totalSizeUsed
+= sizeUsed
;
996 MOZ_DIAGNOSTIC_ASSERT(totalSizeUsed
.isValid());
1000 params
.currentStream() = mCurrentStream
;
1001 params
.status() = mStatus
;
1002 params
.startedReadingCurrent() = mStartedReadingCurrent
;
1004 aParams
= std::move(params
);
1006 MOZ_ASSERT(aSizeUsed
);
1007 *aSizeUsed
= totalSizeUsed
.value();
1010 bool nsMultiplexInputStream::Deserialize(
1011 const InputStreamParams
& aParams
,
1012 const FileDescriptorArray
& aFileDescriptors
) {
1013 if (aParams
.type() != InputStreamParams::TMultiplexInputStreamParams
) {
1014 NS_ERROR("Received unknown parameters from the other process!");
1018 const MultiplexInputStreamParams
& params
=
1019 aParams
.get_MultiplexInputStreamParams();
1021 const nsTArray
<InputStreamParams
>& streams
= params
.streams();
1023 uint32_t streamCount
= streams
.Length();
1024 for (uint32_t index
= 0; index
< streamCount
; index
++) {
1025 nsCOMPtr
<nsIInputStream
> stream
= InputStreamHelper::DeserializeInputStream(
1026 streams
[index
], aFileDescriptors
);
1028 NS_WARNING("Deserialize failed!");
1032 if (NS_FAILED(AppendStream(stream
))) {
1033 NS_WARNING("AppendStream failed!");
1038 mCurrentStream
= params
.currentStream();
1039 mStatus
= params
.status();
1040 mStartedReadingCurrent
= params
.startedReadingCurrent();
1046 nsMultiplexInputStream::GetCloneable(bool* aCloneable
) {
1047 MutexAutoLock
lock(mLock
);
1048 // XXXnsm Cloning a multiplex stream which has started reading is not
1049 // permitted right now.
1050 if (mCurrentStream
> 0 || mStartedReadingCurrent
) {
1051 *aCloneable
= false;
1055 uint32_t len
= mStreams
.Length();
1056 for (uint32_t i
= 0; i
< len
; ++i
) {
1057 nsCOMPtr
<nsICloneableInputStream
> cis
=
1058 do_QueryInterface(mStreams
[i
].mBufferedStream
);
1059 if (!cis
|| !cis
->GetCloneable()) {
1060 *aCloneable
= false;
1070 nsMultiplexInputStream::Clone(nsIInputStream
** aClone
) {
1071 MutexAutoLock
lock(mLock
);
1073 // XXXnsm Cloning a multiplex stream which has started reading is not
1074 // permitted right now.
1075 if (mCurrentStream
> 0 || mStartedReadingCurrent
) {
1076 return NS_ERROR_FAILURE
;
1079 RefPtr
<nsMultiplexInputStream
> clone
= new nsMultiplexInputStream();
1082 uint32_t len
= mStreams
.Length();
1083 for (uint32_t i
= 0; i
< len
; ++i
) {
1084 nsCOMPtr
<nsICloneableInputStream
> substream
=
1085 do_QueryInterface(mStreams
[i
].mBufferedStream
);
1086 if (NS_WARN_IF(!substream
)) {
1087 return NS_ERROR_FAILURE
;
1090 nsCOMPtr
<nsIInputStream
> clonedSubstream
;
1091 rv
= substream
->Clone(getter_AddRefs(clonedSubstream
));
1092 if (NS_WARN_IF(NS_FAILED(rv
))) {
1096 rv
= clone
->AppendStream(clonedSubstream
);
1097 if (NS_WARN_IF(NS_FAILED(rv
))) {
1102 clone
.forget(aClone
);
1107 nsMultiplexInputStream::Length(int64_t* aLength
) {
1108 MutexAutoLock
lock(mLock
);
1110 if (mCurrentStream
> 0 || mStartedReadingCurrent
) {
1111 return NS_ERROR_NOT_AVAILABLE
;
1114 CheckedInt64 length
= 0;
1115 nsresult retval
= NS_OK
;
1117 for (uint32_t i
= 0, len
= mStreams
.Length(); i
< len
; ++i
) {
1118 nsCOMPtr
<nsIInputStreamLength
> substream
=
1119 do_QueryInterface(mStreams
[i
].mBufferedStream
);
1121 // Let's use available as fallback.
1122 uint64_t streamAvail
= 0;
1123 nsresult rv
= AvailableMaybeSeek(mStreams
[i
], &streamAvail
);
1124 if (rv
== NS_BASE_STREAM_CLOSED
) {
1128 if (NS_WARN_IF(NS_FAILED(rv
))) {
1133 length
+= streamAvail
;
1134 if (!length
.isValid()) {
1135 return NS_ERROR_OUT_OF_MEMORY
;
1142 nsresult rv
= substream
->Length(&size
);
1143 if (rv
== NS_BASE_STREAM_CLOSED
) {
1147 if (rv
== NS_ERROR_NOT_AVAILABLE
) {
1151 // If one stream blocks, we all block.
1152 if (rv
!= NS_BASE_STREAM_WOULD_BLOCK
&& NS_WARN_IF(NS_FAILED(rv
))) {
1156 // We want to return WOULD_BLOCK if there is 1 stream that blocks. But want
1157 // to see if there are other streams with length = -1.
1158 if (rv
== NS_BASE_STREAM_WOULD_BLOCK
) {
1159 retval
= NS_BASE_STREAM_WOULD_BLOCK
;
1163 // If one of the stream doesn't know the size, we all don't know the size.
1170 if (!length
.isValid()) {
1171 return NS_ERROR_OUT_OF_MEMORY
;
1175 *aLength
= length
.value();
1179 class nsMultiplexInputStream::AsyncWaitLengthHelper final
1180 : public nsIInputStreamLengthCallback
1186 AsyncWaitLengthHelper()
1187 : mStreamNotified(false), mLength(0), mNegativeSize(false) {}
1189 bool AddStream(nsIAsyncInputStreamLength
* aStream
) {
1190 return mPendingStreams
.AppendElement(aStream
, fallible
);
1193 bool AddSize(int64_t aSize
) {
1194 MOZ_ASSERT(!mNegativeSize
);
1197 return mLength
.isValid();
1200 void NegativeSize() {
1201 MOZ_ASSERT(!mNegativeSize
);
1202 mNegativeSize
= true;
1205 nsresult
Proceed(nsMultiplexInputStream
* aParentStream
,
1206 nsIEventTarget
* aEventTarget
,
1207 const MutexAutoLock
& aProofOfLock
) {
1208 MOZ_ASSERT(!mStream
);
1210 // If we don't need to wait, let's inform the callback immediately.
1211 if (mPendingStreams
.IsEmpty() || mNegativeSize
) {
1212 RefPtr
<nsMultiplexInputStream
> parentStream
= aParentStream
;
1213 int64_t length
= -1;
1214 if (!mNegativeSize
&& mLength
.isValid()) {
1215 length
= mLength
.value();
1217 nsCOMPtr
<nsIRunnable
> r
= NS_NewRunnableFunction(
1218 "AsyncWaitLengthHelper", [parentStream
, length
]() {
1219 MutexAutoLock
lock(parentStream
->GetLock());
1220 parentStream
->AsyncWaitCompleted(length
, lock
);
1222 return aEventTarget
->Dispatch(r
.forget(), NS_DISPATCH_NORMAL
);
1225 // Let's store the callback and the parent stream until we have
1226 // notifications from the async length streams.
1228 mStream
= aParentStream
;
1230 // Let's activate all the pending streams.
1231 for (nsIAsyncInputStreamLength
* stream
: mPendingStreams
) {
1232 nsresult rv
= stream
->AsyncLengthWait(this, aEventTarget
);
1233 if (rv
== NS_BASE_STREAM_CLOSED
) {
1237 if (NS_WARN_IF(NS_FAILED(rv
))) {
1246 OnInputStreamLengthReady(nsIAsyncInputStreamLength
* aStream
,
1247 int64_t aLength
) override
{
1248 MutexAutoLock
lock(mStream
->GetLock());
1250 MOZ_ASSERT(mPendingStreams
.Contains(aStream
));
1251 mPendingStreams
.RemoveElement(aStream
);
1253 // Already notified.
1254 if (mStreamNotified
) {
1258 if (aLength
== -1) {
1259 mNegativeSize
= true;
1262 if (!mLength
.isValid()) {
1263 mNegativeSize
= true;
1268 if (!mNegativeSize
&& !mPendingStreams
.IsEmpty()) {
1272 // Let's notify the parent stream.
1273 mStreamNotified
= true;
1274 mStream
->AsyncWaitCompleted(mNegativeSize
? -1 : mLength
.value(), lock
);
1279 ~AsyncWaitLengthHelper() = default;
1281 RefPtr
<nsMultiplexInputStream
> mStream
;
1282 bool mStreamNotified
;
1284 CheckedInt64 mLength
;
1287 nsTArray
<nsCOMPtr
<nsIAsyncInputStreamLength
>> mPendingStreams
;
1290 NS_IMPL_ISUPPORTS(nsMultiplexInputStream::AsyncWaitLengthHelper
,
1291 nsIInputStreamLengthCallback
)
1294 nsMultiplexInputStream::AsyncLengthWait(nsIInputStreamLengthCallback
* aCallback
,
1295 nsIEventTarget
* aEventTarget
) {
1296 if (NS_WARN_IF(!aEventTarget
)) {
1297 return NS_ERROR_NULL_POINTER
;
1300 MutexAutoLock
lock(mLock
);
1302 if (mCurrentStream
> 0 || mStartedReadingCurrent
) {
1303 return NS_ERROR_NOT_AVAILABLE
;
1307 mAsyncWaitLengthCallback
= nullptr;
1311 // We have a pending operation! Let's use this instead of creating a new one.
1312 if (mAsyncWaitLengthHelper
) {
1313 mAsyncWaitLengthCallback
= aCallback
;
1317 RefPtr
<AsyncWaitLengthHelper
> helper
= new AsyncWaitLengthHelper();
1319 for (uint32_t i
= 0, len
= mStreams
.Length(); i
< len
; ++i
) {
1320 nsCOMPtr
<nsIAsyncInputStreamLength
> asyncStream
=
1321 do_QueryInterface(mStreams
[i
].mBufferedStream
);
1323 if (NS_WARN_IF(!helper
->AddStream(asyncStream
))) {
1324 return NS_ERROR_OUT_OF_MEMORY
;
1329 nsCOMPtr
<nsIInputStreamLength
> stream
=
1330 do_QueryInterface(mStreams
[i
].mBufferedStream
);
1332 // Let's use available as fallback.
1333 uint64_t streamAvail
= 0;
1334 nsresult rv
= AvailableMaybeSeek(mStreams
[i
], &streamAvail
);
1335 if (rv
== NS_BASE_STREAM_CLOSED
) {
1339 if (NS_WARN_IF(NS_FAILED(rv
))) {
1344 if (NS_WARN_IF(!helper
->AddSize(streamAvail
))) {
1345 return NS_ERROR_OUT_OF_MEMORY
;
1352 nsresult rv
= stream
->Length(&size
);
1353 if (rv
== NS_BASE_STREAM_CLOSED
) {
1357 MOZ_ASSERT(rv
!= NS_BASE_STREAM_WOULD_BLOCK
,
1358 "A nsILengthInutStream returns NS_BASE_STREAM_WOULD_BLOCK but "
1359 "it doesn't implement nsIAsyncInputStreamLength.");
1361 if (NS_WARN_IF(NS_FAILED(rv
))) {
1366 helper
->NegativeSize();
1370 if (NS_WARN_IF(!helper
->AddSize(size
))) {
1371 return NS_ERROR_OUT_OF_MEMORY
;
1375 nsresult rv
= helper
->Proceed(this, aEventTarget
, lock
);
1376 if (NS_WARN_IF(NS_FAILED(rv
))) {
1380 mAsyncWaitLengthHelper
= helper
;
1381 mAsyncWaitLengthCallback
= aCallback
;
1385 void nsMultiplexInputStream::AsyncWaitCompleted(
1386 int64_t aLength
, const MutexAutoLock
& aProofOfLock
) {
1387 nsCOMPtr
<nsIInputStreamLengthCallback
> callback
;
1388 callback
.swap(mAsyncWaitLengthCallback
);
1390 mAsyncWaitLengthHelper
= nullptr;
1392 // Already canceled.
1397 MutexAutoUnlock
unlock(mLock
);
1398 callback
->OnInputStreamLengthReady(this, aLength
);
1401 #define MAYBE_UPDATE_VALUE_REAL(x, y) \
1406 #define MAYBE_UPDATE_VALUE(x, y) \
1408 nsCOMPtr<y> substream = do_QueryInterface(aStream.mBufferedStream); \
1409 MAYBE_UPDATE_VALUE_REAL(x, substream) \
1412 void nsMultiplexInputStream::UpdateQIMap(StreamData
& aStream
) {
1413 MAYBE_UPDATE_VALUE_REAL(mSeekableStreams
, aStream
.mSeekableStream
)
1414 MAYBE_UPDATE_VALUE(mIPCSerializableStreams
, nsIIPCSerializableInputStream
)
1415 MAYBE_UPDATE_VALUE(mCloneableStreams
, nsICloneableInputStream
)
1416 MAYBE_UPDATE_VALUE_REAL(mAsyncInputStreams
, aStream
.mAsyncStream
)
1417 MAYBE_UPDATE_VALUE(mInputStreamLengths
, nsIInputStreamLength
)
1418 MAYBE_UPDATE_VALUE(mAsyncInputStreamLengths
, nsIAsyncInputStreamLength
)
1421 #undef MAYBE_UPDATE_VALUE
1423 bool nsMultiplexInputStream::IsSeekable() const {
1424 return mStreams
.Length() == mSeekableStreams
;
1427 bool nsMultiplexInputStream::IsIPCSerializable() const {
1428 return mStreams
.Length() == mIPCSerializableStreams
;
1431 bool nsMultiplexInputStream::IsCloneable() const {
1432 return mStreams
.Length() == mCloneableStreams
;
1435 bool nsMultiplexInputStream::IsAsyncInputStream() const {
1436 // nsMultiplexInputStream is nsIAsyncInputStream if at least 1 of the
1437 // substream implements that interface.
1438 return !!mAsyncInputStreams
;
1441 bool nsMultiplexInputStream::IsInputStreamLength() const {
1442 return !!mInputStreamLengths
;
1445 bool nsMultiplexInputStream::IsAsyncInputStreamLength() const {
1446 return !!mAsyncInputStreamLengths
;