1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/Mutex.h"
8 #include "mozilla/Attributes.h"
9 #include "nsStreamUtils.h"
11 #include "nsICloneableInputStream.h"
12 #include "nsIEventTarget.h"
13 #include "nsICancelableRunnable.h"
14 #include "nsISafeOutputStream.h"
16 #include "nsIAsyncInputStream.h"
17 #include "nsIAsyncOutputStream.h"
18 #include "nsIBufferedStreams.h"
21 #include "nsServiceManagerUtils.h"
22 #include "nsThreadUtils.h"
23 #include "nsITransport.h"
24 #include "nsIStreamTransportService.h"
25 #include "NonBlockingAsyncInputStream.h"
27 using namespace mozilla
;
29 static NS_DEFINE_CID(kStreamTransportServiceCID
, NS_STREAMTRANSPORTSERVICE_CID
);
31 //-----------------------------------------------------------------------------
33 // This is a nsICancelableRunnable because we can dispatch it to Workers and
34 // those can be shut down at any time, and in these cases, Cancel() is called
36 class nsInputStreamReadyEvent final
: public CancelableRunnable
,
37 public nsIInputStreamCallback
,
38 public nsIRunnablePriority
{
40 NS_DECL_ISUPPORTS_INHERITED
42 nsInputStreamReadyEvent(const char* aName
, nsIInputStreamCallback
* aCallback
,
43 nsIEventTarget
* aTarget
, uint32_t aPriority
)
44 : CancelableRunnable(aName
),
47 mPriority(aPriority
) {}
50 ~nsInputStreamReadyEvent() {
55 // whoa!! looks like we never posted this event. take care to
56 // release mCallback on the correct thread. if mTarget lives on the
57 // calling thread, then we are ok. otherwise, we have to try to
58 // proxy the Release over the right thread. if that thread is dead,
59 // then there's nothing we can do... better to leak than crash.
62 nsresult rv
= mTarget
->IsOnCurrentThread(&val
);
63 if (NS_FAILED(rv
) || !val
) {
64 nsCOMPtr
<nsIInputStreamCallback
> event
= NS_NewInputStreamReadyEvent(
65 "~nsInputStreamReadyEvent", mCallback
, mTarget
, mPriority
);
68 rv
= event
->OnInputStreamReady(nullptr);
70 MOZ_ASSERT_UNREACHABLE("leaking stream event");
71 nsISupports
* sup
= event
;
79 NS_IMETHOD
OnInputStreamReady(nsIAsyncInputStream
* aStream
) override
{
82 nsresult rv
= mTarget
->Dispatch(this, NS_DISPATCH_NORMAL
);
84 NS_WARNING("Dispatch failed");
85 return NS_ERROR_FAILURE
;
91 NS_IMETHOD
Run() override
{
94 mCallback
->OnInputStreamReady(mStream
);
101 nsresult
Cancel() override
{
106 NS_IMETHOD
GetPriority(uint32_t* aPriority
) override
{
107 *aPriority
= mPriority
;
112 nsCOMPtr
<nsIAsyncInputStream
> mStream
;
113 nsCOMPtr
<nsIInputStreamCallback
> mCallback
;
114 nsCOMPtr
<nsIEventTarget
> mTarget
;
118 NS_IMPL_ISUPPORTS_INHERITED(nsInputStreamReadyEvent
, CancelableRunnable
,
119 nsIInputStreamCallback
, nsIRunnablePriority
)
121 //-----------------------------------------------------------------------------
123 // This is a nsICancelableRunnable because we can dispatch it to Workers and
124 // those can be shut down at any time, and in these cases, Cancel() is called
126 class nsOutputStreamReadyEvent final
: public CancelableRunnable
,
127 public nsIOutputStreamCallback
{
129 NS_DECL_ISUPPORTS_INHERITED
131 nsOutputStreamReadyEvent(nsIOutputStreamCallback
* aCallback
,
132 nsIEventTarget
* aTarget
)
133 : CancelableRunnable("nsOutputStreamReadyEvent"),
134 mCallback(aCallback
),
138 ~nsOutputStreamReadyEvent() {
143 // whoa!! looks like we never posted this event. take care to
144 // release mCallback on the correct thread. if mTarget lives on the
145 // calling thread, then we are ok. otherwise, we have to try to
146 // proxy the Release over the right thread. if that thread is dead,
147 // then there's nothing we can do... better to leak than crash.
150 nsresult rv
= mTarget
->IsOnCurrentThread(&val
);
151 if (NS_FAILED(rv
) || !val
) {
152 nsCOMPtr
<nsIOutputStreamCallback
> event
=
153 NS_NewOutputStreamReadyEvent(mCallback
, mTarget
);
156 rv
= event
->OnOutputStreamReady(nullptr);
158 MOZ_ASSERT_UNREACHABLE("leaking stream event");
159 nsISupports
* sup
= event
;
167 NS_IMETHOD
OnOutputStreamReady(nsIAsyncOutputStream
* aStream
) override
{
170 nsresult rv
= mTarget
->Dispatch(this, NS_DISPATCH_NORMAL
);
172 NS_WARNING("PostEvent failed");
173 return NS_ERROR_FAILURE
;
179 NS_IMETHOD
Run() override
{
182 mCallback
->OnOutputStreamReady(mStream
);
189 nsresult
Cancel() override
{
195 nsCOMPtr
<nsIAsyncOutputStream
> mStream
;
196 nsCOMPtr
<nsIOutputStreamCallback
> mCallback
;
197 nsCOMPtr
<nsIEventTarget
> mTarget
;
200 NS_IMPL_ISUPPORTS_INHERITED(nsOutputStreamReadyEvent
, CancelableRunnable
,
201 nsIOutputStreamCallback
)
203 //-----------------------------------------------------------------------------
205 already_AddRefed
<nsIInputStreamCallback
> NS_NewInputStreamReadyEvent(
206 const char* aName
, nsIInputStreamCallback
* aCallback
,
207 nsIEventTarget
* aTarget
, uint32_t aPriority
) {
208 NS_ASSERTION(aCallback
, "null callback");
209 NS_ASSERTION(aTarget
, "null target");
210 RefPtr
<nsInputStreamReadyEvent
> ev
=
211 new nsInputStreamReadyEvent(aName
, aCallback
, aTarget
, aPriority
);
215 already_AddRefed
<nsIOutputStreamCallback
> NS_NewOutputStreamReadyEvent(
216 nsIOutputStreamCallback
* aCallback
, nsIEventTarget
* aTarget
) {
217 NS_ASSERTION(aCallback
, "null callback");
218 NS_ASSERTION(aTarget
, "null target");
219 RefPtr
<nsOutputStreamReadyEvent
> ev
=
220 new nsOutputStreamReadyEvent(aCallback
, aTarget
);
224 //-----------------------------------------------------------------------------
225 // NS_AsyncCopy implementation
227 // abstract stream copier...
228 class nsAStreamCopier
: public nsIInputStreamCallback
,
229 public nsIOutputStreamCallback
,
230 public CancelableRunnable
{
232 NS_DECL_ISUPPORTS_INHERITED
235 : CancelableRunnable("nsAStreamCopier"),
236 mLock("nsAStreamCopier.mLock"),
238 mProgressCallback(nullptr),
241 mEventInProcess(false),
242 mEventIsPending(false),
246 mCancelStatus(NS_OK
) {}
248 // kick off the async copy...
249 nsresult
Start(nsIInputStream
* aSource
, nsIOutputStream
* aSink
,
250 nsIEventTarget
* aTarget
, nsAsyncCopyCallbackFun aCallback
,
251 void* aClosure
, uint32_t aChunksize
, bool aCloseSource
,
252 bool aCloseSink
, nsAsyncCopyProgressFun aProgressCallback
) {
256 mCallback
= aCallback
;
258 mChunkSize
= aChunksize
;
259 mCloseSource
= aCloseSource
;
260 mCloseSink
= aCloseSink
;
261 mProgressCallback
= aProgressCallback
;
263 mAsyncSource
= do_QueryInterface(mSource
);
264 mAsyncSink
= do_QueryInterface(mSink
);
266 return PostContinuationEvent();
269 // implemented by subclasses, returns number of bytes copied and
270 // sets source and sink condition before returning.
271 virtual uint32_t DoCopy(nsresult
* aSourceCondition
,
272 nsresult
* aSinkCondition
) = 0;
275 if (!mSource
|| !mSink
) {
279 nsresult cancelStatus
;
282 MutexAutoLock
lock(mLock
);
283 canceled
= mCanceled
;
284 cancelStatus
= mCancelStatus
;
287 // If the copy was canceled before Process() was even called, then
288 // sourceCondition and sinkCondition should be set to error results to
289 // ensure we don't call Finish() on a canceled nsISafeOutputStream.
290 MOZ_ASSERT(NS_FAILED(cancelStatus
) == canceled
, "cancel needs an error");
291 nsresult sourceCondition
= cancelStatus
;
292 nsresult sinkCondition
= cancelStatus
;
294 // Copy data from the source to the sink until we hit failure or have
295 // copied all the data.
297 // Note: copyFailed will be true if the source or the sink have
298 // reported an error, or if we failed to write any bytes
299 // because we have consumed all of our data.
300 bool copyFailed
= false;
302 uint32_t n
= DoCopy(&sourceCondition
, &sinkCondition
);
303 if (n
> 0 && mProgressCallback
) {
304 mProgressCallback(mClosure
, n
);
307 NS_FAILED(sourceCondition
) || NS_FAILED(sinkCondition
) || n
== 0;
309 MutexAutoLock
lock(mLock
);
310 canceled
= mCanceled
;
311 cancelStatus
= mCancelStatus
;
313 if (copyFailed
&& !canceled
) {
314 if (sourceCondition
== NS_BASE_STREAM_WOULD_BLOCK
&& mAsyncSource
) {
315 // need to wait for more data from source. while waiting for
316 // more source data, be sure to observe failures on output end.
317 mAsyncSource
->AsyncWait(this, 0, 0, nullptr);
320 mAsyncSink
->AsyncWait(this, nsIAsyncOutputStream::WAIT_CLOSURE_ONLY
,
323 } else if (sinkCondition
== NS_BASE_STREAM_WOULD_BLOCK
&& mAsyncSink
) {
324 // need to wait for more room in the sink. while waiting for
325 // more room in the sink, be sure to observer failures on the
327 mAsyncSink
->AsyncWait(this, 0, 0, nullptr);
330 mAsyncSource
->AsyncWait(
331 this, nsIAsyncInputStream::WAIT_CLOSURE_ONLY
, 0, nullptr);
335 if (copyFailed
|| canceled
) {
339 mAsyncSource
->CloseWithStatus(canceled
? cancelStatus
345 mAsyncSource
= nullptr;
351 mAsyncSink
->CloseWithStatus(canceled
? cancelStatus
354 // If we have an nsISafeOutputStream, and our
355 // sourceCondition and sinkCondition are not set to a
356 // failure state, finish writing.
357 nsCOMPtr
<nsISafeOutputStream
> sostream
= do_QueryInterface(mSink
);
358 if (sostream
&& NS_SUCCEEDED(sourceCondition
) &&
359 NS_SUCCEEDED(sinkCondition
)) {
366 mAsyncSink
= nullptr;
369 // notify state complete...
373 status
= sourceCondition
;
374 if (NS_SUCCEEDED(status
)) {
375 status
= sinkCondition
;
377 if (status
== NS_BASE_STREAM_CLOSED
) {
381 status
= cancelStatus
;
383 mCallback(mClosure
, status
);
390 nsresult
Cancel(nsresult aReason
) {
391 MutexAutoLock
lock(mLock
);
393 return NS_ERROR_FAILURE
;
396 if (NS_SUCCEEDED(aReason
)) {
397 NS_WARNING("cancel with non-failure status code");
398 aReason
= NS_BASE_STREAM_CLOSED
;
402 mCancelStatus
= aReason
;
406 NS_IMETHOD
OnInputStreamReady(nsIAsyncInputStream
* aSource
) override
{
407 PostContinuationEvent();
411 NS_IMETHOD
OnOutputStreamReady(nsIAsyncOutputStream
* aSink
) override
{
412 PostContinuationEvent();
416 // continuation event handler
417 NS_IMETHOD
Run() override
{
420 // clear "in process" flag and post any pending continuation event
421 MutexAutoLock
lock(mLock
);
422 mEventInProcess
= false;
423 if (mEventIsPending
) {
424 mEventIsPending
= false;
425 PostContinuationEvent_Locked();
431 nsresult
Cancel() MOZ_MUST_OVERRIDE override
= 0;
433 nsresult
PostContinuationEvent() {
434 // we cannot post a continuation event if there is currently
435 // an event in process. doing so could result in Process being
436 // run simultaneously on multiple threads, so we mark the event
437 // as pending, and if an event is already in process then we
438 // just let that existing event take care of posting the real
439 // continuation event.
441 MutexAutoLock
lock(mLock
);
442 return PostContinuationEvent_Locked();
445 nsresult
PostContinuationEvent_Locked() {
447 if (mEventInProcess
) {
448 mEventIsPending
= true;
450 rv
= mTarget
->Dispatch(this, NS_DISPATCH_NORMAL
);
451 if (NS_SUCCEEDED(rv
)) {
452 mEventInProcess
= true;
454 NS_WARNING("unable to post continuation event");
461 nsCOMPtr
<nsIInputStream
> mSource
;
462 nsCOMPtr
<nsIOutputStream
> mSink
;
463 nsCOMPtr
<nsIAsyncInputStream
> mAsyncSource
;
464 nsCOMPtr
<nsIAsyncOutputStream
> mAsyncSink
;
465 nsCOMPtr
<nsIEventTarget
> mTarget
;
467 nsAsyncCopyCallbackFun mCallback
;
468 nsAsyncCopyProgressFun mProgressCallback
;
471 bool mEventInProcess
;
472 bool mEventIsPending
;
476 nsresult mCancelStatus
;
478 // virtual since subclasses call superclass Release()
479 virtual ~nsAStreamCopier() = default;
482 NS_IMPL_ISUPPORTS_INHERITED(nsAStreamCopier
, CancelableRunnable
,
483 nsIInputStreamCallback
, nsIOutputStreamCallback
)
485 class nsStreamCopierIB final
: public nsAStreamCopier
{
487 nsStreamCopierIB() : nsAStreamCopier() {}
488 virtual ~nsStreamCopierIB() = default;
490 struct MOZ_STACK_CLASS ReadSegmentsState
{
491 // the nsIOutputStream will outlive the ReadSegmentsState on the stack
492 nsIOutputStream
* MOZ_NON_OWNING_REF mSink
;
493 nsresult mSinkCondition
;
496 static nsresult
ConsumeInputBuffer(nsIInputStream
* aInStr
, void* aClosure
,
497 const char* aBuffer
, uint32_t aOffset
,
498 uint32_t aCount
, uint32_t* aCountWritten
) {
499 ReadSegmentsState
* state
= (ReadSegmentsState
*)aClosure
;
501 nsresult rv
= state
->mSink
->Write(aBuffer
, aCount
, aCountWritten
);
503 state
->mSinkCondition
= rv
;
504 } else if (*aCountWritten
== 0) {
505 state
->mSinkCondition
= NS_BASE_STREAM_CLOSED
;
508 return state
->mSinkCondition
;
511 uint32_t DoCopy(nsresult
* aSourceCondition
,
512 nsresult
* aSinkCondition
) override
{
513 ReadSegmentsState state
;
515 state
.mSinkCondition
= NS_OK
;
519 mSource
->ReadSegments(ConsumeInputBuffer
, &state
, mChunkSize
, &n
);
520 *aSinkCondition
= state
.mSinkCondition
;
524 nsresult
Cancel() override
{ return NS_OK
; }
527 class nsStreamCopierOB final
: public nsAStreamCopier
{
529 nsStreamCopierOB() : nsAStreamCopier() {}
530 virtual ~nsStreamCopierOB() = default;
532 struct MOZ_STACK_CLASS WriteSegmentsState
{
533 // the nsIInputStream will outlive the WriteSegmentsState on the stack
534 nsIInputStream
* MOZ_NON_OWNING_REF mSource
;
535 nsresult mSourceCondition
;
538 static nsresult
FillOutputBuffer(nsIOutputStream
* aOutStr
, void* aClosure
,
539 char* aBuffer
, uint32_t aOffset
,
540 uint32_t aCount
, uint32_t* aCountRead
) {
541 WriteSegmentsState
* state
= (WriteSegmentsState
*)aClosure
;
543 nsresult rv
= state
->mSource
->Read(aBuffer
, aCount
, aCountRead
);
545 state
->mSourceCondition
= rv
;
546 } else if (*aCountRead
== 0) {
547 state
->mSourceCondition
= NS_BASE_STREAM_CLOSED
;
550 return state
->mSourceCondition
;
553 uint32_t DoCopy(nsresult
* aSourceCondition
,
554 nsresult
* aSinkCondition
) override
{
555 WriteSegmentsState state
;
556 state
.mSource
= mSource
;
557 state
.mSourceCondition
= NS_OK
;
561 mSink
->WriteSegments(FillOutputBuffer
, &state
, mChunkSize
, &n
);
562 *aSourceCondition
= state
.mSourceCondition
;
566 nsresult
Cancel() override
{ return NS_OK
; }
569 //-----------------------------------------------------------------------------
571 nsresult
NS_AsyncCopy(nsIInputStream
* aSource
, nsIOutputStream
* aSink
,
572 nsIEventTarget
* aTarget
, nsAsyncCopyMode aMode
,
573 uint32_t aChunkSize
, nsAsyncCopyCallbackFun aCallback
,
574 void* aClosure
, bool aCloseSource
, bool aCloseSink
,
575 nsISupports
** aCopierCtx
,
576 nsAsyncCopyProgressFun aProgressCallback
) {
577 NS_ASSERTION(aTarget
, "non-null target required");
580 nsAStreamCopier
* copier
;
582 if (aMode
== NS_ASYNCCOPY_VIA_READSEGMENTS
) {
583 copier
= new nsStreamCopierIB();
585 copier
= new nsStreamCopierOB();
588 // Start() takes an owning ref to the copier...
590 rv
= copier
->Start(aSource
, aSink
, aTarget
, aCallback
, aClosure
, aChunkSize
,
591 aCloseSource
, aCloseSink
, aProgressCallback
);
594 *aCopierCtx
= static_cast<nsISupports
*>(static_cast<nsIRunnable
*>(copier
));
595 NS_ADDREF(*aCopierCtx
);
602 //-----------------------------------------------------------------------------
604 nsresult
NS_CancelAsyncCopy(nsISupports
* aCopierCtx
, nsresult aReason
) {
605 nsAStreamCopier
* copier
=
606 static_cast<nsAStreamCopier
*>(static_cast<nsIRunnable
*>(aCopierCtx
));
607 return copier
->Cancel(aReason
);
610 //-----------------------------------------------------------------------------
613 template <typename T
>
614 struct ResultTraits
{};
617 struct ResultTraits
<nsACString
> {
618 static void Clear(nsACString
& aString
) { aString
.Truncate(); }
620 static char* GetStorage(nsACString
& aString
) {
621 return aString
.BeginWriting();
626 struct ResultTraits
<nsTArray
<uint8_t>> {
627 static void Clear(nsTArray
<uint8_t>& aArray
) { aArray
.Clear(); }
629 static char* GetStorage(nsTArray
<uint8_t>& aArray
) {
630 return reinterpret_cast<char*>(aArray
.Elements());
635 template <typename T
>
636 nsresult
DoConsumeStream(nsIInputStream
* aStream
, uint32_t aMaxCount
,
639 ResultTraits
<T
>::Clear(aResult
);
643 rv
= aStream
->Available(&avail64
);
645 if (rv
== NS_BASE_STREAM_CLOSED
) {
654 uint32_t avail
= (uint32_t)XPCOM_MIN
<uint64_t>(avail64
, aMaxCount
);
656 // resize aResult buffer
657 uint32_t length
= aResult
.Length();
658 CheckedInt
<uint32_t> newLength
= CheckedInt
<uint32_t>(length
) + avail
;
659 if (!newLength
.isValid()) {
660 return NS_ERROR_FILE_TOO_BIG
;
663 if (!aResult
.SetLength(newLength
.value(), fallible
)) {
664 return NS_ERROR_OUT_OF_MEMORY
;
666 char* buf
= ResultTraits
<T
>::GetStorage(aResult
) + length
;
669 rv
= aStream
->Read(buf
, avail
, &n
);
674 MOZ_ASSERT(n
< avail
, "What happened there???");
675 aResult
.SetLength(length
+ n
);
686 nsresult
NS_ConsumeStream(nsIInputStream
* aStream
, uint32_t aMaxCount
,
687 nsACString
& aResult
) {
688 return DoConsumeStream(aStream
, aMaxCount
, aResult
);
691 nsresult
NS_ConsumeStream(nsIInputStream
* aStream
, uint32_t aMaxCount
,
692 nsTArray
<uint8_t>& aResult
) {
693 return DoConsumeStream(aStream
, aMaxCount
, aResult
);
696 //-----------------------------------------------------------------------------
698 static nsresult
TestInputStream(nsIInputStream
* aInStr
, void* aClosure
,
699 const char* aBuffer
, uint32_t aOffset
,
700 uint32_t aCount
, uint32_t* aCountWritten
) {
701 bool* result
= static_cast<bool*>(aClosure
);
704 return NS_ERROR_ABORT
; // don't call me anymore
707 bool NS_InputStreamIsBuffered(nsIInputStream
* aStream
) {
708 nsCOMPtr
<nsIBufferedInputStream
> bufferedIn
= do_QueryInterface(aStream
);
715 nsresult rv
= aStream
->ReadSegments(TestInputStream
, &result
, 1, &n
);
716 return result
|| rv
!= NS_ERROR_NOT_IMPLEMENTED
;
719 static nsresult
TestOutputStream(nsIOutputStream
* aOutStr
, void* aClosure
,
720 char* aBuffer
, uint32_t aOffset
,
721 uint32_t aCount
, uint32_t* aCountRead
) {
722 bool* result
= static_cast<bool*>(aClosure
);
725 return NS_ERROR_ABORT
; // don't call me anymore
728 bool NS_OutputStreamIsBuffered(nsIOutputStream
* aStream
) {
729 nsCOMPtr
<nsIBufferedOutputStream
> bufferedOut
= do_QueryInterface(aStream
);
736 aStream
->WriteSegments(TestOutputStream
, &result
, 1, &n
);
740 //-----------------------------------------------------------------------------
742 nsresult
NS_CopySegmentToStream(nsIInputStream
* aInStr
, void* aClosure
,
743 const char* aBuffer
, uint32_t aOffset
,
744 uint32_t aCount
, uint32_t* aCountWritten
) {
745 nsIOutputStream
* outStr
= static_cast<nsIOutputStream
*>(aClosure
);
749 nsresult rv
= outStr
->Write(aBuffer
, aCount
, &n
);
760 nsresult
NS_CopySegmentToBuffer(nsIInputStream
* aInStr
, void* aClosure
,
761 const char* aBuffer
, uint32_t aOffset
,
762 uint32_t aCount
, uint32_t* aCountWritten
) {
763 char* toBuf
= static_cast<char*>(aClosure
);
764 memcpy(&toBuf
[aOffset
], aBuffer
, aCount
);
765 *aCountWritten
= aCount
;
769 nsresult
NS_CopySegmentToBuffer(nsIOutputStream
* aOutStr
, void* aClosure
,
770 char* aBuffer
, uint32_t aOffset
,
771 uint32_t aCount
, uint32_t* aCountRead
) {
772 const char* fromBuf
= static_cast<const char*>(aClosure
);
773 memcpy(aBuffer
, &fromBuf
[aOffset
], aCount
);
774 *aCountRead
= aCount
;
778 nsresult
NS_DiscardSegment(nsIInputStream
* aInStr
, void* aClosure
,
779 const char* aBuffer
, uint32_t aOffset
,
780 uint32_t aCount
, uint32_t* aCountWritten
) {
781 *aCountWritten
= aCount
;
785 //-----------------------------------------------------------------------------
787 nsresult
NS_WriteSegmentThunk(nsIInputStream
* aInStr
, void* aClosure
,
788 const char* aBuffer
, uint32_t aOffset
,
789 uint32_t aCount
, uint32_t* aCountWritten
) {
790 nsWriteSegmentThunk
* thunk
= static_cast<nsWriteSegmentThunk
*>(aClosure
);
791 return thunk
->mFun(thunk
->mStream
, thunk
->mClosure
, aBuffer
, aOffset
, aCount
,
795 nsresult
NS_FillArray(FallibleTArray
<char>& aDest
, nsIInputStream
* aInput
,
796 uint32_t aKeep
, uint32_t* aNewBytes
) {
797 MOZ_ASSERT(aInput
, "null stream");
798 MOZ_ASSERT(aKeep
<= aDest
.Length(), "illegal keep count");
800 char* aBuffer
= aDest
.Elements();
801 int64_t keepOffset
= int64_t(aDest
.Length()) - aKeep
;
802 if (aKeep
!= 0 && keepOffset
> 0) {
803 memmove(aBuffer
, aBuffer
+ keepOffset
, aKeep
);
807 aInput
->Read(aBuffer
+ aKeep
, aDest
.Capacity() - aKeep
, aNewBytes
);
811 // NOTE: we rely on the fact that the new slots are NOT initialized by
812 // SetLengthAndRetainStorage here, see nsTArrayElementTraits::Construct()
814 aDest
.SetLengthAndRetainStorage(aKeep
+ *aNewBytes
);
816 MOZ_ASSERT(aDest
.Length() <= aDest
.Capacity(), "buffer overflow");
820 bool NS_InputStreamIsCloneable(nsIInputStream
* aSource
) {
825 nsCOMPtr
<nsICloneableInputStream
> cloneable
= do_QueryInterface(aSource
);
826 return cloneable
&& cloneable
->GetCloneable();
829 nsresult
NS_CloneInputStream(nsIInputStream
* aSource
,
830 nsIInputStream
** aCloneOut
,
831 nsIInputStream
** aReplacementOut
) {
832 if (NS_WARN_IF(!aSource
)) {
833 return NS_ERROR_FAILURE
;
836 // Attempt to perform the clone directly on the source stream
837 nsCOMPtr
<nsICloneableInputStream
> cloneable
= do_QueryInterface(aSource
);
838 if (cloneable
&& cloneable
->GetCloneable()) {
839 if (aReplacementOut
) {
840 *aReplacementOut
= nullptr;
842 return cloneable
->Clone(aCloneOut
);
845 // If we failed the clone and the caller does not want to replace their
846 // original stream, then we are done. Return error.
847 if (!aReplacementOut
) {
848 return NS_ERROR_FAILURE
;
851 // The caller has opted-in to the fallback clone support that replaces
852 // the original stream. Copy the data to a pipe and return two cloned
855 nsCOMPtr
<nsIInputStream
> reader
;
856 nsCOMPtr
<nsIInputStream
> readerClone
;
857 nsCOMPtr
<nsIOutputStream
> writer
;
859 nsresult rv
= NS_NewPipe(getter_AddRefs(reader
), getter_AddRefs(writer
), 0,
860 0, // default segment size and max size
861 true, true); // non-blocking
862 if (NS_WARN_IF(NS_FAILED(rv
))) {
866 cloneable
= do_QueryInterface(reader
);
867 MOZ_ASSERT(cloneable
&& cloneable
->GetCloneable());
869 rv
= cloneable
->Clone(getter_AddRefs(readerClone
));
870 if (NS_WARN_IF(NS_FAILED(rv
))) {
874 nsCOMPtr
<nsIEventTarget
> target
=
875 do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID
, &rv
);
876 if (NS_WARN_IF(NS_FAILED(rv
))) {
880 rv
= NS_AsyncCopy(aSource
, writer
, target
, NS_ASYNCCOPY_VIA_WRITESEGMENTS
);
881 if (NS_WARN_IF(NS_FAILED(rv
))) {
885 readerClone
.forget(aCloneOut
);
886 reader
.forget(aReplacementOut
);
891 nsresult
NS_MakeAsyncNonBlockingInputStream(
892 already_AddRefed
<nsIInputStream
> aSource
,
893 nsIAsyncInputStream
** aAsyncInputStream
, bool aCloseWhenDone
,
894 uint32_t aFlags
, uint32_t aSegmentSize
, uint32_t aSegmentCount
) {
895 nsCOMPtr
<nsIInputStream
> source
= std::move(aSource
);
896 if (NS_WARN_IF(!aAsyncInputStream
)) {
897 return NS_ERROR_FAILURE
;
900 bool nonBlocking
= false;
901 nsresult rv
= source
->IsNonBlocking(&nonBlocking
);
902 if (NS_WARN_IF(NS_FAILED(rv
))) {
906 nsCOMPtr
<nsIAsyncInputStream
> asyncStream
= do_QueryInterface(source
);
908 if (nonBlocking
&& asyncStream
) {
909 // This stream is perfect!
910 asyncStream
.forget(aAsyncInputStream
);
915 // If the stream is non-blocking but not async, we wrap it.
916 return NonBlockingAsyncInputStream::Create(source
.forget(),
920 nsCOMPtr
<nsIStreamTransportService
> sts
=
921 do_GetService(kStreamTransportServiceCID
, &rv
);
922 if (NS_WARN_IF(NS_FAILED(rv
))) {
926 nsCOMPtr
<nsITransport
> transport
;
927 rv
= sts
->CreateInputTransport(source
, aCloseWhenDone
,
928 getter_AddRefs(transport
));
929 if (NS_WARN_IF(NS_FAILED(rv
))) {
933 nsCOMPtr
<nsIInputStream
> wrapper
;
934 rv
= transport
->OpenInputStream(aFlags
, aSegmentSize
, aSegmentCount
,
935 getter_AddRefs(wrapper
));
936 if (NS_WARN_IF(NS_FAILED(rv
))) {
940 asyncStream
= do_QueryInterface(wrapper
);
941 MOZ_ASSERT(asyncStream
);
943 asyncStream
.forget(aAsyncInputStream
);