Bug 1690340 - Part 2: Use the new naming for the developer tools menu items. r=jdescottes
[gecko.git] / xpcom / io / nsMultiplexInputStream.cpp
blob5bde3d130a59ce637e29a18127643dfd13241a25
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 /**
8 * The multiplex stream concatenates a list of input streams into a single
9 * stream.
12 #include "mozilla/Attributes.h"
13 #include "mozilla/CheckedInt.h"
14 #include "mozilla/MathAlgorithms.h"
15 #include "mozilla/Mutex.h"
17 #include "base/basictypes.h"
19 #include "nsMultiplexInputStream.h"
20 #include "nsIBufferedStreams.h"
21 #include "nsICloneableInputStream.h"
22 #include "nsIMultiplexInputStream.h"
23 #include "nsISeekableStream.h"
24 #include "nsCOMPtr.h"
25 #include "nsCOMArray.h"
26 #include "nsIClassInfoImpl.h"
27 #include "nsIIPCSerializableInputStream.h"
28 #include "mozilla/ipc/InputStreamUtils.h"
29 #include "nsIAsyncInputStream.h"
30 #include "nsIInputStreamLength.h"
31 #include "nsNetUtil.h"
32 #include "nsStreamUtils.h"
34 using namespace mozilla;
35 using namespace mozilla::ipc;
37 using mozilla::DeprecatedAbs;
38 using mozilla::Maybe;
39 using mozilla::Nothing;
40 using mozilla::Some;
42 class nsMultiplexInputStream final : public nsIMultiplexInputStream,
43 public nsISeekableStream,
44 public nsIIPCSerializableInputStream,
45 public nsICloneableInputStream,
46 public nsIAsyncInputStream,
47 public nsIInputStreamCallback,
48 public nsIInputStreamLength,
49 public nsIAsyncInputStreamLength {
50 public:
51 nsMultiplexInputStream();
53 NS_DECL_THREADSAFE_ISUPPORTS
54 NS_DECL_NSIINPUTSTREAM
55 NS_DECL_NSIMULTIPLEXINPUTSTREAM
56 NS_DECL_NSISEEKABLESTREAM
57 NS_DECL_NSITELLABLESTREAM
58 NS_DECL_NSIIPCSERIALIZABLEINPUTSTREAM
59 NS_DECL_NSICLONEABLEINPUTSTREAM
60 NS_DECL_NSIASYNCINPUTSTREAM
61 NS_DECL_NSIINPUTSTREAMCALLBACK
62 NS_DECL_NSIINPUTSTREAMLENGTH
63 NS_DECL_NSIASYNCINPUTSTREAMLENGTH
65 // This is used for nsIAsyncInputStream::AsyncWait
66 void AsyncWaitCompleted();
68 // This is used for nsIAsyncInputStreamLength::AsyncLengthWait
69 void AsyncWaitCompleted(int64_t aLength, const MutexAutoLock& aProofOfLock);
71 struct StreamData {
72 nsresult Initialize(nsIInputStream* aOriginalStream) {
73 mCurrentPos = 0;
75 mOriginalStream = aOriginalStream;
77 mBufferedStream = aOriginalStream;
78 if (!NS_InputStreamIsBuffered(mBufferedStream)) {
79 nsCOMPtr<nsIInputStream> bufferedStream;
80 nsresult rv = NS_NewBufferedInputStream(getter_AddRefs(bufferedStream),
81 mBufferedStream.forget(), 4096);
82 NS_ENSURE_SUCCESS(rv, rv);
83 mBufferedStream = bufferedStream;
86 mAsyncStream = do_QueryInterface(mBufferedStream);
87 mSeekableStream = do_QueryInterface(mBufferedStream);
89 return NS_OK;
92 nsCOMPtr<nsIInputStream> mOriginalStream;
94 // Equal to mOriginalStream or a wrap around the original stream to make it
95 // buffered.
96 nsCOMPtr<nsIInputStream> mBufferedStream;
98 // This can be null.
99 nsCOMPtr<nsIAsyncInputStream> mAsyncStream;
100 // This can be null.
101 nsCOMPtr<nsISeekableStream> mSeekableStream;
103 uint64_t mCurrentPos;
106 Mutex& GetLock() { return mLock; }
108 private:
109 ~nsMultiplexInputStream() = default;
111 nsresult AsyncWaitInternal();
113 // This method updates mSeekableStreams, mTellableStreams,
114 // mIPCSerializableStreams and mCloneableStreams values.
115 void UpdateQIMap(StreamData& aStream);
117 struct MOZ_STACK_CLASS ReadSegmentsState {
118 nsCOMPtr<nsIInputStream> mThisStream;
119 uint32_t mOffset;
120 nsWriteSegmentFun mWriter;
121 void* mClosure;
122 bool mDone;
125 template <typename M>
126 void SerializeInternal(InputStreamParams& aParams,
127 FileDescriptorArray& aFileDescriptors,
128 bool aDelayedStart, uint32_t aMaxSize,
129 uint32_t* aSizeUsed, M* aManager);
131 static nsresult ReadSegCb(nsIInputStream* aIn, void* aClosure,
132 const char* aFromRawSegment, uint32_t aToOffset,
133 uint32_t aCount, uint32_t* aWriteCount);
135 bool IsSeekable() const;
136 bool IsIPCSerializable() const;
137 bool IsCloneable() const;
138 bool IsAsyncInputStream() const;
139 bool IsInputStreamLength() const;
140 bool IsAsyncInputStreamLength() const;
142 Mutex mLock; // Protects access to all data members.
144 nsTArray<StreamData> mStreams;
146 uint32_t mCurrentStream;
147 bool mStartedReadingCurrent;
148 nsresult mStatus;
149 nsCOMPtr<nsIInputStreamCallback> mAsyncWaitCallback;
150 uint32_t mAsyncWaitFlags;
151 uint32_t mAsyncWaitRequestedCount;
152 nsCOMPtr<nsIEventTarget> mAsyncWaitEventTarget;
153 nsCOMPtr<nsIInputStreamLengthCallback> mAsyncWaitLengthCallback;
155 class AsyncWaitLengthHelper;
156 RefPtr<AsyncWaitLengthHelper> mAsyncWaitLengthHelper;
158 uint32_t mSeekableStreams;
159 uint32_t mIPCSerializableStreams;
160 uint32_t mCloneableStreams;
161 uint32_t mAsyncInputStreams;
162 uint32_t mInputStreamLengths;
163 uint32_t mAsyncInputStreamLengths;
166 NS_IMPL_ADDREF(nsMultiplexInputStream)
167 NS_IMPL_RELEASE(nsMultiplexInputStream)
169 NS_IMPL_CLASSINFO(nsMultiplexInputStream, nullptr, nsIClassInfo::THREADSAFE,
170 NS_MULTIPLEXINPUTSTREAM_CID)
172 NS_INTERFACE_MAP_BEGIN(nsMultiplexInputStream)
173 NS_INTERFACE_MAP_ENTRY(nsIMultiplexInputStream)
174 NS_INTERFACE_MAP_ENTRY(nsIInputStream)
175 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream, IsSeekable())
176 NS_INTERFACE_MAP_ENTRY(nsITellableStream)
177 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream,
178 IsIPCSerializable())
179 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream, IsCloneable())
180 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream, IsAsyncInputStream())
181 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback,
182 IsAsyncInputStream())
183 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamLength,
184 IsInputStreamLength())
185 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStreamLength,
186 IsAsyncInputStreamLength())
187 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIMultiplexInputStream)
188 NS_IMPL_QUERY_CLASSINFO(nsMultiplexInputStream)
189 NS_INTERFACE_MAP_END
191 NS_IMPL_CI_INTERFACE_GETTER(nsMultiplexInputStream, nsIMultiplexInputStream,
192 nsIInputStream, nsISeekableStream,
193 nsITellableStream)
195 static nsresult AvailableMaybeSeek(nsMultiplexInputStream::StreamData& aStream,
196 uint64_t* aResult) {
197 nsresult rv = aStream.mBufferedStream->Available(aResult);
198 if (rv == NS_BASE_STREAM_CLOSED) {
199 // Blindly seek to the current position if Available() returns
200 // NS_BASE_STREAM_CLOSED.
201 // If nsIFileInputStream is closed in Read() due to CLOSE_ON_EOF flag,
202 // Seek() could reopen the file if REOPEN_ON_REWIND flag is set.
203 if (aStream.mSeekableStream) {
204 nsresult rvSeek =
205 aStream.mSeekableStream->Seek(nsISeekableStream::NS_SEEK_CUR, 0);
206 if (NS_SUCCEEDED(rvSeek)) {
207 rv = aStream.mBufferedStream->Available(aResult);
211 return rv;
214 nsMultiplexInputStream::nsMultiplexInputStream()
215 : mLock("nsMultiplexInputStream lock"),
216 mCurrentStream(0),
217 mStartedReadingCurrent(false),
218 mStatus(NS_OK),
219 mAsyncWaitFlags(0),
220 mAsyncWaitRequestedCount(0),
221 mSeekableStreams(0),
222 mIPCSerializableStreams(0),
223 mCloneableStreams(0),
224 mAsyncInputStreams(0),
225 mInputStreamLengths(0),
226 mAsyncInputStreamLengths(0) {}
228 NS_IMETHODIMP
229 nsMultiplexInputStream::GetCount(uint32_t* aCount) {
230 MutexAutoLock lock(mLock);
231 *aCount = mStreams.Length();
232 return NS_OK;
235 NS_IMETHODIMP
236 nsMultiplexInputStream::AppendStream(nsIInputStream* aStream) {
237 MutexAutoLock lock(mLock);
239 StreamData* streamData = mStreams.AppendElement(fallible);
240 if (NS_WARN_IF(!streamData)) {
241 return NS_ERROR_OUT_OF_MEMORY;
244 nsresult rv = streamData->Initialize(aStream);
245 NS_ENSURE_SUCCESS(rv, rv);
247 UpdateQIMap(*streamData);
249 if (mStatus == NS_BASE_STREAM_CLOSED) {
250 // We were closed, but now we have more data to read.
251 mStatus = NS_OK;
254 return NS_OK;
257 NS_IMETHODIMP
258 nsMultiplexInputStream::GetStream(uint32_t aIndex, nsIInputStream** aResult) {
259 MutexAutoLock lock(mLock);
261 if (aIndex >= mStreams.Length()) {
262 return NS_ERROR_NOT_AVAILABLE;
265 StreamData& streamData = mStreams.ElementAt(aIndex);
266 nsCOMPtr<nsIInputStream> stream = streamData.mOriginalStream;
267 stream.forget(aResult);
268 return NS_OK;
271 NS_IMETHODIMP
272 nsMultiplexInputStream::Close() {
273 nsTArray<nsCOMPtr<nsIInputStream>> streams;
275 // Let's take a copy of the streams becuase, calling close() it could trigger
276 // a nsIInputStreamCallback immediately and we don't want to create a deadlock
277 // with mutex.
279 MutexAutoLock lock(mLock);
280 uint32_t len = mStreams.Length();
281 for (uint32_t i = 0; i < len; ++i) {
282 if (NS_WARN_IF(
283 !streams.AppendElement(mStreams[i].mBufferedStream, fallible))) {
284 mStatus = NS_BASE_STREAM_CLOSED;
285 return NS_ERROR_OUT_OF_MEMORY;
288 mStatus = NS_BASE_STREAM_CLOSED;
291 nsresult rv = NS_OK;
293 uint32_t len = streams.Length();
294 for (uint32_t i = 0; i < len; ++i) {
295 nsresult rv2 = streams[i]->Close();
296 // We still want to close all streams, but we should return an error
297 if (NS_FAILED(rv2)) {
298 rv = rv2;
302 return rv;
305 NS_IMETHODIMP
306 nsMultiplexInputStream::Available(uint64_t* aResult) {
307 *aResult = 0;
309 MutexAutoLock lock(mLock);
310 if (NS_FAILED(mStatus)) {
311 return mStatus;
314 uint64_t avail = 0;
315 nsresult rv = NS_BASE_STREAM_CLOSED;
317 uint32_t len = mStreams.Length();
318 for (uint32_t i = mCurrentStream; i < len; i++) {
319 uint64_t streamAvail;
320 rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
321 if (rv == NS_BASE_STREAM_CLOSED) {
322 // If a stream is closed, we continue with the next one.
323 // If this is the current stream we move to the following stream.
324 if (mCurrentStream == i) {
325 ++mCurrentStream;
328 // If this is the last stream, we want to return this error code.
329 continue;
332 if (NS_WARN_IF(NS_FAILED(rv))) {
333 mStatus = rv;
334 return mStatus;
337 // If the current stream is async, we have to return what we have so far
338 // without processing the following streams. This is needed because
339 // ::Available should return only what is currently available. In case of an
340 // nsIAsyncInputStream, we have to call AsyncWait() in order to read more.
341 if (mStreams[i].mAsyncStream) {
342 avail += streamAvail;
343 break;
346 if (streamAvail == 0) {
347 // Nothing to read for this stream. Let's move to the next one.
348 continue;
351 avail += streamAvail;
354 // We still have something to read. We don't want to return an error code yet.
355 if (avail) {
356 *aResult = avail;
357 return NS_OK;
360 // Let's propagate the last error message.
361 mStatus = rv;
362 return rv;
365 NS_IMETHODIMP
366 nsMultiplexInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult) {
367 MutexAutoLock lock(mLock);
368 // It is tempting to implement this method in terms of ReadSegments, but
369 // that would prevent this class from being used with streams that only
370 // implement Read (e.g., file streams).
372 *aResult = 0;
374 if (mStatus == NS_BASE_STREAM_CLOSED) {
375 return NS_OK;
377 if (NS_FAILED(mStatus)) {
378 return mStatus;
381 nsresult rv = NS_OK;
383 uint32_t len = mStreams.Length();
384 while (mCurrentStream < len && aCount) {
385 uint32_t read;
386 rv = mStreams[mCurrentStream].mBufferedStream->Read(aBuf, aCount, &read);
388 // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
389 // (This is a bug in those stream implementations)
390 if (rv == NS_BASE_STREAM_CLOSED) {
391 MOZ_ASSERT_UNREACHABLE(
392 "Input stream's Read method returned "
393 "NS_BASE_STREAM_CLOSED");
394 rv = NS_OK;
395 read = 0;
396 } else if (NS_FAILED(rv)) {
397 break;
400 if (read == 0) {
401 ++mCurrentStream;
402 mStartedReadingCurrent = false;
403 } else {
404 NS_ASSERTION(aCount >= read, "Read more than requested");
405 *aResult += read;
406 aCount -= read;
407 aBuf += read;
408 mStartedReadingCurrent = true;
410 mStreams[mCurrentStream].mCurrentPos += read;
413 return *aResult ? NS_OK : rv;
416 NS_IMETHODIMP
417 nsMultiplexInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
418 uint32_t aCount, uint32_t* aResult) {
419 MutexAutoLock lock(mLock);
421 if (mStatus == NS_BASE_STREAM_CLOSED) {
422 *aResult = 0;
423 return NS_OK;
425 if (NS_FAILED(mStatus)) {
426 return mStatus;
429 NS_ASSERTION(aWriter, "missing aWriter");
431 nsresult rv = NS_OK;
432 ReadSegmentsState state;
433 state.mThisStream = this;
434 state.mOffset = 0;
435 state.mWriter = aWriter;
436 state.mClosure = aClosure;
437 state.mDone = false;
439 uint32_t len = mStreams.Length();
440 while (mCurrentStream < len && aCount) {
441 uint32_t read;
442 rv = mStreams[mCurrentStream].mBufferedStream->ReadSegments(
443 ReadSegCb, &state, aCount, &read);
445 // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
446 // (This is a bug in those stream implementations)
447 if (rv == NS_BASE_STREAM_CLOSED) {
448 MOZ_ASSERT_UNREACHABLE(
449 "Input stream's Read method returned "
450 "NS_BASE_STREAM_CLOSED");
451 rv = NS_OK;
452 read = 0;
455 // if |aWriter| decided to stop reading segments...
456 if (state.mDone || NS_FAILED(rv)) {
457 break;
460 // if stream is empty, then advance to the next stream.
461 if (read == 0) {
462 ++mCurrentStream;
463 mStartedReadingCurrent = false;
464 } else {
465 NS_ASSERTION(aCount >= read, "Read more than requested");
466 state.mOffset += read;
467 aCount -= read;
468 mStartedReadingCurrent = true;
470 mStreams[mCurrentStream].mCurrentPos += read;
474 // if we successfully read some data, then this call succeeded.
475 *aResult = state.mOffset;
476 return state.mOffset ? NS_OK : rv;
479 nsresult nsMultiplexInputStream::ReadSegCb(nsIInputStream* aIn, void* aClosure,
480 const char* aFromRawSegment,
481 uint32_t aToOffset, uint32_t aCount,
482 uint32_t* aWriteCount) {
483 nsresult rv;
484 ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
485 rv = (state->mWriter)(state->mThisStream, state->mClosure, aFromRawSegment,
486 aToOffset + state->mOffset, aCount, aWriteCount);
487 if (NS_FAILED(rv)) {
488 state->mDone = true;
490 return rv;
493 NS_IMETHODIMP
494 nsMultiplexInputStream::IsNonBlocking(bool* aNonBlocking) {
495 MutexAutoLock lock(mLock);
497 uint32_t len = mStreams.Length();
498 if (len == 0) {
499 // Claim to be non-blocking, since we won't block the caller.
500 *aNonBlocking = true;
501 return NS_OK;
504 for (uint32_t i = 0; i < len; ++i) {
505 nsresult rv = mStreams[i].mBufferedStream->IsNonBlocking(aNonBlocking);
506 if (NS_WARN_IF(NS_FAILED(rv))) {
507 return rv;
509 // If one is blocking the entire stream becomes blocking.
510 if (!*aNonBlocking) {
511 return NS_OK;
515 return NS_OK;
518 NS_IMETHODIMP
519 nsMultiplexInputStream::Seek(int32_t aWhence, int64_t aOffset) {
520 MutexAutoLock lock(mLock);
522 if (NS_FAILED(mStatus)) {
523 return mStatus;
526 nsresult rv;
528 uint32_t oldCurrentStream = mCurrentStream;
529 bool oldStartedReadingCurrent = mStartedReadingCurrent;
531 if (aWhence == NS_SEEK_SET) {
532 int64_t remaining = aOffset;
533 if (aOffset == 0) {
534 mCurrentStream = 0;
536 for (uint32_t i = 0; i < mStreams.Length(); ++i) {
537 nsCOMPtr<nsISeekableStream> stream = mStreams[i].mSeekableStream;
538 if (!stream) {
539 return NS_ERROR_FAILURE;
542 // See if all remaining streams should be rewound
543 if (remaining == 0) {
544 if (i < oldCurrentStream ||
545 (i == oldCurrentStream && oldStartedReadingCurrent)) {
546 rv = stream->Seek(NS_SEEK_SET, 0);
547 if (NS_WARN_IF(NS_FAILED(rv))) {
548 return rv;
551 mStreams[i].mCurrentPos = 0;
552 continue;
553 } else {
554 break;
558 // Get position in the current stream
559 int64_t streamPos;
560 if (i > oldCurrentStream ||
561 (i == oldCurrentStream && !oldStartedReadingCurrent)) {
562 streamPos = 0;
563 } else {
564 streamPos = mStreams[i].mCurrentPos;
567 // See if we need to seek the current stream forward or backward
568 if (remaining < streamPos) {
569 rv = stream->Seek(NS_SEEK_SET, remaining);
570 if (NS_WARN_IF(NS_FAILED(rv))) {
571 return rv;
574 mStreams[i].mCurrentPos = remaining;
575 mCurrentStream = i;
576 mStartedReadingCurrent = remaining != 0;
578 remaining = 0;
579 } else if (remaining > streamPos) {
580 if (i < oldCurrentStream) {
581 // We're already at end so no need to seek this stream
582 remaining -= streamPos;
583 NS_ASSERTION(remaining >= 0, "Remaining invalid");
584 } else {
585 uint64_t avail;
586 rv = AvailableMaybeSeek(mStreams[i], &avail);
587 if (NS_WARN_IF(NS_FAILED(rv))) {
588 return rv;
591 int64_t newPos = XPCOM_MIN(remaining, streamPos + (int64_t)avail);
593 rv = stream->Seek(NS_SEEK_SET, newPos);
594 if (NS_WARN_IF(NS_FAILED(rv))) {
595 return rv;
598 mStreams[i].mCurrentPos = newPos;
599 mCurrentStream = i;
600 mStartedReadingCurrent = true;
602 remaining -= newPos;
603 NS_ASSERTION(remaining >= 0, "Remaining invalid");
605 } else {
606 NS_ASSERTION(remaining == streamPos, "Huh?");
607 MOZ_ASSERT(remaining != 0, "Zero remaining should be handled earlier");
608 remaining = 0;
609 mCurrentStream = i;
610 mStartedReadingCurrent = true;
614 return NS_OK;
617 if (aWhence == NS_SEEK_CUR && aOffset > 0) {
618 int64_t remaining = aOffset;
619 for (uint32_t i = mCurrentStream; remaining && i < mStreams.Length(); ++i) {
620 uint64_t avail;
621 rv = AvailableMaybeSeek(mStreams[i], &avail);
622 if (NS_WARN_IF(NS_FAILED(rv))) {
623 return rv;
626 int64_t seek = XPCOM_MIN((int64_t)avail, remaining);
628 rv = mStreams[i].mSeekableStream->Seek(NS_SEEK_CUR, seek);
629 if (NS_WARN_IF(NS_FAILED(rv))) {
630 return rv;
633 mStreams[i].mCurrentPos += seek;
634 mCurrentStream = i;
635 mStartedReadingCurrent = true;
637 remaining -= seek;
640 return NS_OK;
643 if (aWhence == NS_SEEK_CUR && aOffset < 0) {
644 int64_t remaining = -aOffset;
645 for (uint32_t i = mCurrentStream; remaining && i != (uint32_t)-1; --i) {
646 int64_t pos = mStreams[i].mCurrentPos;
648 int64_t seek = XPCOM_MIN(pos, remaining);
650 rv = mStreams[i].mSeekableStream->Seek(NS_SEEK_CUR, -seek);
651 if (NS_WARN_IF(NS_FAILED(rv))) {
652 return rv;
655 mStreams[i].mCurrentPos -= seek;
656 mCurrentStream = i;
657 mStartedReadingCurrent = seek != -pos;
659 remaining -= seek;
662 return NS_OK;
665 if (aWhence == NS_SEEK_CUR) {
666 NS_ASSERTION(aOffset == 0, "Should have handled all non-zero values");
668 return NS_OK;
671 if (aWhence == NS_SEEK_END) {
672 if (aOffset > 0) {
673 return NS_ERROR_INVALID_ARG;
676 int64_t remaining = aOffset;
677 int32_t i;
678 for (i = mStreams.Length() - 1; i >= 0; --i) {
679 nsCOMPtr<nsISeekableStream> stream = mStreams[i].mSeekableStream;
681 uint64_t avail;
682 rv = AvailableMaybeSeek(mStreams[i], &avail);
683 if (NS_WARN_IF(NS_FAILED(rv))) {
684 return rv;
687 int64_t streamLength = avail + mStreams[i].mCurrentPos;
689 // The seek(END) can be completed in the current stream.
690 if (streamLength >= DeprecatedAbs(remaining)) {
691 rv = stream->Seek(NS_SEEK_END, remaining);
692 if (NS_WARN_IF(NS_FAILED(rv))) {
693 return rv;
696 mStreams[i].mCurrentPos = streamLength + remaining;
697 mCurrentStream = i;
698 mStartedReadingCurrent = true;
699 break;
702 // We are at the beginning of this stream.
703 rv = stream->Seek(NS_SEEK_SET, 0);
704 if (NS_WARN_IF(NS_FAILED(rv))) {
705 return rv;
708 remaining += streamLength;
709 mStreams[i].mCurrentPos = 0;
712 // Any other stream must be set to the end.
713 for (--i; i >= 0; --i) {
714 nsCOMPtr<nsISeekableStream> stream = mStreams[i].mSeekableStream;
716 uint64_t avail;
717 rv = AvailableMaybeSeek(mStreams[i], &avail);
718 if (NS_WARN_IF(NS_FAILED(rv))) {
719 return rv;
722 int64_t streamLength = avail + mStreams[i].mCurrentPos;
724 rv = stream->Seek(NS_SEEK_END, 0);
725 if (NS_WARN_IF(NS_FAILED(rv))) {
726 return rv;
729 mStreams[i].mCurrentPos = streamLength;
732 return NS_OK;
735 // other Seeks not implemented yet
736 return NS_ERROR_NOT_IMPLEMENTED;
739 NS_IMETHODIMP
740 nsMultiplexInputStream::Tell(int64_t* aResult) {
741 MutexAutoLock lock(mLock);
743 if (NS_FAILED(mStatus)) {
744 return mStatus;
747 int64_t ret64 = 0;
748 #ifdef DEBUG
749 bool zeroFound = false;
750 #endif
752 for (uint32_t i = 0; i < mStreams.Length(); ++i) {
753 ret64 += mStreams[i].mCurrentPos;
755 #ifdef DEBUG
756 // When we see 1 stream with currentPos = 0, all the remaining streams must
757 // be set to 0 as well.
758 MOZ_ASSERT_IF(zeroFound, mStreams[i].mCurrentPos == 0);
759 if (mStreams[i].mCurrentPos == 0) {
760 zeroFound = true;
762 #endif
764 *aResult = ret64;
766 return NS_OK;
769 NS_IMETHODIMP
770 nsMultiplexInputStream::SetEOF() { return NS_ERROR_NOT_IMPLEMENTED; }
772 NS_IMETHODIMP
773 nsMultiplexInputStream::CloseWithStatus(nsresult aStatus) { return Close(); }
775 // This class is used to inform nsMultiplexInputStream that it's time to execute
776 // the asyncWait callback.
777 class AsyncWaitRunnable final : public DiscardableRunnable {
778 RefPtr<nsMultiplexInputStream> mStream;
780 public:
781 static void Create(nsMultiplexInputStream* aStream,
782 nsIEventTarget* aEventTarget) {
783 RefPtr<AsyncWaitRunnable> runnable = new AsyncWaitRunnable(aStream);
784 if (aEventTarget) {
785 aEventTarget->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
786 } else {
787 runnable->Run();
791 NS_IMETHOD
792 Run() override {
793 mStream->AsyncWaitCompleted();
794 return NS_OK;
797 private:
798 explicit AsyncWaitRunnable(nsMultiplexInputStream* aStream)
799 : DiscardableRunnable("AsyncWaitRunnable"), mStream(aStream) {
800 MOZ_ASSERT(aStream);
804 NS_IMETHODIMP
805 nsMultiplexInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
806 uint32_t aFlags, uint32_t aRequestedCount,
807 nsIEventTarget* aEventTarget) {
809 MutexAutoLock lock(mLock);
811 // We must execute the callback also when the stream is closed.
812 if (NS_FAILED(mStatus) && mStatus != NS_BASE_STREAM_CLOSED) {
813 return mStatus;
816 if (mAsyncWaitCallback && aCallback) {
817 return NS_ERROR_FAILURE;
820 mAsyncWaitCallback = aCallback;
821 mAsyncWaitFlags = aFlags;
822 mAsyncWaitRequestedCount = aRequestedCount;
823 mAsyncWaitEventTarget = aEventTarget;
825 if (!mAsyncWaitCallback) {
826 return NS_OK;
830 return AsyncWaitInternal();
833 nsresult nsMultiplexInputStream::AsyncWaitInternal() {
834 nsCOMPtr<nsIAsyncInputStream> stream;
835 uint32_t asyncWaitFlags = 0;
836 uint32_t asyncWaitRequestedCount = 0;
837 nsCOMPtr<nsIEventTarget> asyncWaitEventTarget;
840 MutexAutoLock lock(mLock);
842 // Let's take the first async stream if we are not already closed, and if
843 // it has data to read or if it async.
844 if (mStatus != NS_BASE_STREAM_CLOSED) {
845 for (; mCurrentStream < mStreams.Length(); ++mCurrentStream) {
846 stream = mStreams[mCurrentStream].mAsyncStream;
847 if (stream) {
848 break;
851 uint64_t avail = 0;
852 nsresult rv = AvailableMaybeSeek(mStreams[mCurrentStream], &avail);
853 if (rv == NS_BASE_STREAM_CLOSED || (NS_SUCCEEDED(rv) && avail == 0)) {
854 // Nothing to read here. Let's move on.
855 continue;
858 if (NS_FAILED(rv)) {
859 return rv;
862 break;
866 asyncWaitFlags = mAsyncWaitFlags;
867 asyncWaitRequestedCount = mAsyncWaitRequestedCount;
868 asyncWaitEventTarget = mAsyncWaitEventTarget;
871 MOZ_ASSERT_IF(stream, NS_SUCCEEDED(mStatus));
873 // If we are here it's because we are already closed, or if the current stream
874 // is not async. In both case we have to execute the callback.
875 if (!stream) {
876 AsyncWaitRunnable::Create(this, asyncWaitEventTarget);
877 return NS_OK;
880 return stream->AsyncWait(this, asyncWaitFlags, asyncWaitRequestedCount,
881 asyncWaitEventTarget);
884 NS_IMETHODIMP
885 nsMultiplexInputStream::OnInputStreamReady(nsIAsyncInputStream* aStream) {
886 nsCOMPtr<nsIInputStreamCallback> callback;
888 // When OnInputStreamReady is called, we could be in 2 scenarios:
889 // a. there is something to read;
890 // b. the stream is closed.
891 // But if the stream is closed and it was not the last one, we must proceed
892 // with the following stream in order to have something to read by the callee.
895 MutexAutoLock lock(mLock);
897 // The callback has been nullified in the meantime.
898 if (!mAsyncWaitCallback) {
899 return NS_OK;
902 if (NS_SUCCEEDED(mStatus)) {
903 uint64_t avail = 0;
904 nsresult rv = aStream->Available(&avail);
905 if (rv == NS_BASE_STREAM_CLOSED || avail == 0) {
906 // This stream is closed or empty, let's move to the following one.
907 ++mCurrentStream;
908 MutexAutoUnlock unlock(mLock);
909 return AsyncWaitInternal();
913 mAsyncWaitCallback.swap(callback);
914 mAsyncWaitEventTarget = nullptr;
917 return callback->OnInputStreamReady(this);
920 void nsMultiplexInputStream::AsyncWaitCompleted() {
921 nsCOMPtr<nsIInputStreamCallback> callback;
924 MutexAutoLock lock(mLock);
926 // The callback has been nullified in the meantime.
927 if (!mAsyncWaitCallback) {
928 return;
931 mAsyncWaitCallback.swap(callback);
932 mAsyncWaitEventTarget = nullptr;
935 callback->OnInputStreamReady(this);
938 nsresult nsMultiplexInputStreamConstructor(nsISupports* aOuter, REFNSIID aIID,
939 void** aResult) {
940 *aResult = nullptr;
942 if (aOuter) {
943 return NS_ERROR_NO_AGGREGATION;
946 RefPtr<nsMultiplexInputStream> inst = new nsMultiplexInputStream();
948 return inst->QueryInterface(aIID, aResult);
951 void nsMultiplexInputStream::Serialize(
952 InputStreamParams& aParams, FileDescriptorArray& aFileDescriptors,
953 bool aDelayedStart, uint32_t aMaxSize, uint32_t* aSizeUsed,
954 mozilla::ipc::ParentToChildStreamActorManager* aManager) {
955 SerializeInternal(aParams, aFileDescriptors, aDelayedStart, aMaxSize,
956 aSizeUsed, aManager);
959 void nsMultiplexInputStream::Serialize(
960 InputStreamParams& aParams, FileDescriptorArray& aFileDescriptors,
961 bool aDelayedStart, uint32_t aMaxSize, uint32_t* aSizeUsed,
962 mozilla::ipc::ChildToParentStreamActorManager* aManager) {
963 SerializeInternal(aParams, aFileDescriptors, aDelayedStart, aMaxSize,
964 aSizeUsed, aManager);
967 template <typename M>
968 void nsMultiplexInputStream::SerializeInternal(
969 InputStreamParams& aParams, FileDescriptorArray& aFileDescriptors,
970 bool aDelayedStart, uint32_t aMaxSize, uint32_t* aSizeUsed, M* aManager) {
971 MutexAutoLock lock(mLock);
973 MultiplexInputStreamParams params;
975 CheckedUint32 totalSizeUsed = 0;
976 CheckedUint32 maxSize = aMaxSize;
978 uint32_t streamCount = mStreams.Length();
979 if (streamCount) {
980 nsTArray<InputStreamParams>& streams = params.streams();
982 streams.SetCapacity(streamCount);
983 for (uint32_t index = 0; index < streamCount; index++) {
984 uint32_t sizeUsed = 0;
985 InputStreamHelper::SerializeInputStream(
986 mStreams[index].mOriginalStream, *streams.AppendElement(),
987 aFileDescriptors, aDelayedStart, maxSize.value(), &sizeUsed,
988 aManager);
990 MOZ_ASSERT(maxSize.value() >= sizeUsed);
992 maxSize -= sizeUsed;
993 MOZ_DIAGNOSTIC_ASSERT(maxSize.isValid());
995 totalSizeUsed += sizeUsed;
996 MOZ_DIAGNOSTIC_ASSERT(totalSizeUsed.isValid());
1000 params.currentStream() = mCurrentStream;
1001 params.status() = mStatus;
1002 params.startedReadingCurrent() = mStartedReadingCurrent;
1004 aParams = std::move(params);
1006 MOZ_ASSERT(aSizeUsed);
1007 *aSizeUsed = totalSizeUsed.value();
1010 bool nsMultiplexInputStream::Deserialize(
1011 const InputStreamParams& aParams,
1012 const FileDescriptorArray& aFileDescriptors) {
1013 if (aParams.type() != InputStreamParams::TMultiplexInputStreamParams) {
1014 NS_ERROR("Received unknown parameters from the other process!");
1015 return false;
1018 const MultiplexInputStreamParams& params =
1019 aParams.get_MultiplexInputStreamParams();
1021 const nsTArray<InputStreamParams>& streams = params.streams();
1023 uint32_t streamCount = streams.Length();
1024 for (uint32_t index = 0; index < streamCount; index++) {
1025 nsCOMPtr<nsIInputStream> stream = InputStreamHelper::DeserializeInputStream(
1026 streams[index], aFileDescriptors);
1027 if (!stream) {
1028 NS_WARNING("Deserialize failed!");
1029 return false;
1032 if (NS_FAILED(AppendStream(stream))) {
1033 NS_WARNING("AppendStream failed!");
1034 return false;
1038 mCurrentStream = params.currentStream();
1039 mStatus = params.status();
1040 mStartedReadingCurrent = params.startedReadingCurrent();
1042 return true;
1045 NS_IMETHODIMP
1046 nsMultiplexInputStream::GetCloneable(bool* aCloneable) {
1047 MutexAutoLock lock(mLock);
1048 // XXXnsm Cloning a multiplex stream which has started reading is not
1049 // permitted right now.
1050 if (mCurrentStream > 0 || mStartedReadingCurrent) {
1051 *aCloneable = false;
1052 return NS_OK;
1055 uint32_t len = mStreams.Length();
1056 for (uint32_t i = 0; i < len; ++i) {
1057 nsCOMPtr<nsICloneableInputStream> cis =
1058 do_QueryInterface(mStreams[i].mBufferedStream);
1059 if (!cis || !cis->GetCloneable()) {
1060 *aCloneable = false;
1061 return NS_OK;
1065 *aCloneable = true;
1066 return NS_OK;
1069 NS_IMETHODIMP
1070 nsMultiplexInputStream::Clone(nsIInputStream** aClone) {
1071 MutexAutoLock lock(mLock);
1073 // XXXnsm Cloning a multiplex stream which has started reading is not
1074 // permitted right now.
1075 if (mCurrentStream > 0 || mStartedReadingCurrent) {
1076 return NS_ERROR_FAILURE;
1079 RefPtr<nsMultiplexInputStream> clone = new nsMultiplexInputStream();
1081 nsresult rv;
1082 uint32_t len = mStreams.Length();
1083 for (uint32_t i = 0; i < len; ++i) {
1084 nsCOMPtr<nsICloneableInputStream> substream =
1085 do_QueryInterface(mStreams[i].mBufferedStream);
1086 if (NS_WARN_IF(!substream)) {
1087 return NS_ERROR_FAILURE;
1090 nsCOMPtr<nsIInputStream> clonedSubstream;
1091 rv = substream->Clone(getter_AddRefs(clonedSubstream));
1092 if (NS_WARN_IF(NS_FAILED(rv))) {
1093 return rv;
1096 rv = clone->AppendStream(clonedSubstream);
1097 if (NS_WARN_IF(NS_FAILED(rv))) {
1098 return rv;
1102 clone.forget(aClone);
1103 return NS_OK;
1106 NS_IMETHODIMP
1107 nsMultiplexInputStream::Length(int64_t* aLength) {
1108 MutexAutoLock lock(mLock);
1110 if (mCurrentStream > 0 || mStartedReadingCurrent) {
1111 return NS_ERROR_NOT_AVAILABLE;
1114 CheckedInt64 length = 0;
1115 nsresult retval = NS_OK;
1117 for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
1118 nsCOMPtr<nsIInputStreamLength> substream =
1119 do_QueryInterface(mStreams[i].mBufferedStream);
1120 if (!substream) {
1121 // Let's use available as fallback.
1122 uint64_t streamAvail = 0;
1123 nsresult rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
1124 if (rv == NS_BASE_STREAM_CLOSED) {
1125 continue;
1128 if (NS_WARN_IF(NS_FAILED(rv))) {
1129 mStatus = rv;
1130 return mStatus;
1133 length += streamAvail;
1134 if (!length.isValid()) {
1135 return NS_ERROR_OUT_OF_MEMORY;
1138 continue;
1141 int64_t size = 0;
1142 nsresult rv = substream->Length(&size);
1143 if (rv == NS_BASE_STREAM_CLOSED) {
1144 continue;
1147 if (rv == NS_ERROR_NOT_AVAILABLE) {
1148 return rv;
1151 // If one stream blocks, we all block.
1152 if (rv != NS_BASE_STREAM_WOULD_BLOCK && NS_WARN_IF(NS_FAILED(rv))) {
1153 return rv;
1156 // We want to return WOULD_BLOCK if there is 1 stream that blocks. But want
1157 // to see if there are other streams with length = -1.
1158 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
1159 retval = NS_BASE_STREAM_WOULD_BLOCK;
1160 continue;
1163 // If one of the stream doesn't know the size, we all don't know the size.
1164 if (size == -1) {
1165 *aLength = -1;
1166 return NS_OK;
1169 length += size;
1170 if (!length.isValid()) {
1171 return NS_ERROR_OUT_OF_MEMORY;
1175 *aLength = length.value();
1176 return retval;
1179 class nsMultiplexInputStream::AsyncWaitLengthHelper final
1180 : public nsIInputStreamLengthCallback
1183 public:
1184 NS_DECL_ISUPPORTS
1186 AsyncWaitLengthHelper()
1187 : mStreamNotified(false), mLength(0), mNegativeSize(false) {}
1189 bool AddStream(nsIAsyncInputStreamLength* aStream) {
1190 return mPendingStreams.AppendElement(aStream, fallible);
1193 bool AddSize(int64_t aSize) {
1194 MOZ_ASSERT(!mNegativeSize);
1196 mLength += aSize;
1197 return mLength.isValid();
1200 void NegativeSize() {
1201 MOZ_ASSERT(!mNegativeSize);
1202 mNegativeSize = true;
1205 nsresult Proceed(nsMultiplexInputStream* aParentStream,
1206 nsIEventTarget* aEventTarget,
1207 const MutexAutoLock& aProofOfLock) {
1208 MOZ_ASSERT(!mStream);
1210 // If we don't need to wait, let's inform the callback immediately.
1211 if (mPendingStreams.IsEmpty() || mNegativeSize) {
1212 RefPtr<nsMultiplexInputStream> parentStream = aParentStream;
1213 int64_t length = -1;
1214 if (!mNegativeSize && mLength.isValid()) {
1215 length = mLength.value();
1217 nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
1218 "AsyncWaitLengthHelper", [parentStream, length]() {
1219 MutexAutoLock lock(parentStream->GetLock());
1220 parentStream->AsyncWaitCompleted(length, lock);
1222 return aEventTarget->Dispatch(r.forget(), NS_DISPATCH_NORMAL);
1225 // Let's store the callback and the parent stream until we have
1226 // notifications from the async length streams.
1228 mStream = aParentStream;
1230 // Let's activate all the pending streams.
1231 for (nsIAsyncInputStreamLength* stream : mPendingStreams) {
1232 nsresult rv = stream->AsyncLengthWait(this, aEventTarget);
1233 if (rv == NS_BASE_STREAM_CLOSED) {
1234 continue;
1237 if (NS_WARN_IF(NS_FAILED(rv))) {
1238 return rv;
1242 return NS_OK;
1245 NS_IMETHOD
1246 OnInputStreamLengthReady(nsIAsyncInputStreamLength* aStream,
1247 int64_t aLength) override {
1248 MutexAutoLock lock(mStream->GetLock());
1250 MOZ_ASSERT(mPendingStreams.Contains(aStream));
1251 mPendingStreams.RemoveElement(aStream);
1253 // Already notified.
1254 if (mStreamNotified) {
1255 return NS_OK;
1258 if (aLength == -1) {
1259 mNegativeSize = true;
1260 } else {
1261 mLength += aLength;
1262 if (!mLength.isValid()) {
1263 mNegativeSize = true;
1267 // We need to wait.
1268 if (!mNegativeSize && !mPendingStreams.IsEmpty()) {
1269 return NS_OK;
1272 // Let's notify the parent stream.
1273 mStreamNotified = true;
1274 mStream->AsyncWaitCompleted(mNegativeSize ? -1 : mLength.value(), lock);
1275 return NS_OK;
1278 private:
1279 ~AsyncWaitLengthHelper() = default;
1281 RefPtr<nsMultiplexInputStream> mStream;
1282 bool mStreamNotified;
1284 CheckedInt64 mLength;
1285 bool mNegativeSize;
1287 nsTArray<nsCOMPtr<nsIAsyncInputStreamLength>> mPendingStreams;
1290 NS_IMPL_ISUPPORTS(nsMultiplexInputStream::AsyncWaitLengthHelper,
1291 nsIInputStreamLengthCallback)
1293 NS_IMETHODIMP
1294 nsMultiplexInputStream::AsyncLengthWait(nsIInputStreamLengthCallback* aCallback,
1295 nsIEventTarget* aEventTarget) {
1296 if (NS_WARN_IF(!aEventTarget)) {
1297 return NS_ERROR_NULL_POINTER;
1300 MutexAutoLock lock(mLock);
1302 if (mCurrentStream > 0 || mStartedReadingCurrent) {
1303 return NS_ERROR_NOT_AVAILABLE;
1306 if (!aCallback) {
1307 mAsyncWaitLengthCallback = nullptr;
1308 return NS_OK;
1311 // We have a pending operation! Let's use this instead of creating a new one.
1312 if (mAsyncWaitLengthHelper) {
1313 mAsyncWaitLengthCallback = aCallback;
1314 return NS_OK;
1317 RefPtr<AsyncWaitLengthHelper> helper = new AsyncWaitLengthHelper();
1319 for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
1320 nsCOMPtr<nsIAsyncInputStreamLength> asyncStream =
1321 do_QueryInterface(mStreams[i].mBufferedStream);
1322 if (asyncStream) {
1323 if (NS_WARN_IF(!helper->AddStream(asyncStream))) {
1324 return NS_ERROR_OUT_OF_MEMORY;
1326 continue;
1329 nsCOMPtr<nsIInputStreamLength> stream =
1330 do_QueryInterface(mStreams[i].mBufferedStream);
1331 if (!stream) {
1332 // Let's use available as fallback.
1333 uint64_t streamAvail = 0;
1334 nsresult rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
1335 if (rv == NS_BASE_STREAM_CLOSED) {
1336 continue;
1339 if (NS_WARN_IF(NS_FAILED(rv))) {
1340 mStatus = rv;
1341 return mStatus;
1344 if (NS_WARN_IF(!helper->AddSize(streamAvail))) {
1345 return NS_ERROR_OUT_OF_MEMORY;
1348 continue;
1351 int64_t size = 0;
1352 nsresult rv = stream->Length(&size);
1353 if (rv == NS_BASE_STREAM_CLOSED) {
1354 continue;
1357 MOZ_ASSERT(rv != NS_BASE_STREAM_WOULD_BLOCK,
1358 "A nsILengthInutStream returns NS_BASE_STREAM_WOULD_BLOCK but "
1359 "it doesn't implement nsIAsyncInputStreamLength.");
1361 if (NS_WARN_IF(NS_FAILED(rv))) {
1362 return rv;
1365 if (size == -1) {
1366 helper->NegativeSize();
1367 break;
1370 if (NS_WARN_IF(!helper->AddSize(size))) {
1371 return NS_ERROR_OUT_OF_MEMORY;
1375 nsresult rv = helper->Proceed(this, aEventTarget, lock);
1376 if (NS_WARN_IF(NS_FAILED(rv))) {
1377 return rv;
1380 mAsyncWaitLengthHelper = helper;
1381 mAsyncWaitLengthCallback = aCallback;
1382 return NS_OK;
1385 void nsMultiplexInputStream::AsyncWaitCompleted(
1386 int64_t aLength, const MutexAutoLock& aProofOfLock) {
1387 nsCOMPtr<nsIInputStreamLengthCallback> callback;
1388 callback.swap(mAsyncWaitLengthCallback);
1390 mAsyncWaitLengthHelper = nullptr;
1392 // Already canceled.
1393 if (!callback) {
1394 return;
1397 MutexAutoUnlock unlock(mLock);
1398 callback->OnInputStreamLengthReady(this, aLength);
1401 #define MAYBE_UPDATE_VALUE_REAL(x, y) \
1402 if (y) { \
1403 ++x; \
1406 #define MAYBE_UPDATE_VALUE(x, y) \
1408 nsCOMPtr<y> substream = do_QueryInterface(aStream.mBufferedStream); \
1409 MAYBE_UPDATE_VALUE_REAL(x, substream) \
1412 void nsMultiplexInputStream::UpdateQIMap(StreamData& aStream) {
1413 MAYBE_UPDATE_VALUE_REAL(mSeekableStreams, aStream.mSeekableStream)
1414 MAYBE_UPDATE_VALUE(mIPCSerializableStreams, nsIIPCSerializableInputStream)
1415 MAYBE_UPDATE_VALUE(mCloneableStreams, nsICloneableInputStream)
1416 MAYBE_UPDATE_VALUE_REAL(mAsyncInputStreams, aStream.mAsyncStream)
1417 MAYBE_UPDATE_VALUE(mInputStreamLengths, nsIInputStreamLength)
1418 MAYBE_UPDATE_VALUE(mAsyncInputStreamLengths, nsIAsyncInputStreamLength)
1421 #undef MAYBE_UPDATE_VALUE
1423 bool nsMultiplexInputStream::IsSeekable() const {
1424 return mStreams.Length() == mSeekableStreams;
1427 bool nsMultiplexInputStream::IsIPCSerializable() const {
1428 return mStreams.Length() == mIPCSerializableStreams;
1431 bool nsMultiplexInputStream::IsCloneable() const {
1432 return mStreams.Length() == mCloneableStreams;
1435 bool nsMultiplexInputStream::IsAsyncInputStream() const {
1436 // nsMultiplexInputStream is nsIAsyncInputStream if at least 1 of the
1437 // substream implements that interface.
1438 return !!mAsyncInputStreams;
1441 bool nsMultiplexInputStream::IsInputStreamLength() const {
1442 return !!mInputStreamLengths;
1445 bool nsMultiplexInputStream::IsAsyncInputStreamLength() const {
1446 return !!mAsyncInputStreamLengths;