1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/Mutex.h"
8 #include "mozilla/Attributes.h"
9 #include "nsStreamUtils.h"
10 #include "nsAutoPtr.h"
13 #include "nsIEventTarget.h"
14 #include "nsIRunnable.h"
15 #include "nsISafeOutputStream.h"
17 #include "nsIAsyncInputStream.h"
18 #include "nsIAsyncOutputStream.h"
19 #include "nsIBufferedStreams.h"
21 using namespace mozilla
;
23 //-----------------------------------------------------------------------------
25 class nsInputStreamReadyEvent MOZ_FINAL
27 , public nsIInputStreamCallback
30 NS_DECL_THREADSAFE_ISUPPORTS
32 nsInputStreamReadyEvent(nsIInputStreamCallback
* aCallback
,
33 nsIEventTarget
* aTarget
)
34 : mCallback(aCallback
)
40 ~nsInputStreamReadyEvent()
46 // whoa!! looks like we never posted this event. take care to
47 // release mCallback on the correct thread. if mTarget lives on the
48 // calling thread, then we are ok. otherwise, we have to try to
49 // proxy the Release over the right thread. if that thread is dead,
50 // then there's nothing we can do... better to leak than crash.
53 nsresult rv
= mTarget
->IsOnCurrentThread(&val
);
54 if (NS_FAILED(rv
) || !val
) {
55 nsCOMPtr
<nsIInputStreamCallback
> event
=
56 NS_NewInputStreamReadyEvent(mCallback
, mTarget
);
59 rv
= event
->OnInputStreamReady(nullptr);
61 NS_NOTREACHED("leaking stream event");
62 nsISupports
* sup
= event
;
70 NS_IMETHOD
OnInputStreamReady(nsIAsyncInputStream
* aStream
)
75 mTarget
->Dispatch(this, NS_DISPATCH_NORMAL
);
77 NS_WARNING("Dispatch failed");
78 return NS_ERROR_FAILURE
;
88 mCallback
->OnInputStreamReady(mStream
);
96 nsCOMPtr
<nsIAsyncInputStream
> mStream
;
97 nsCOMPtr
<nsIInputStreamCallback
> mCallback
;
98 nsCOMPtr
<nsIEventTarget
> mTarget
;
101 NS_IMPL_ISUPPORTS(nsInputStreamReadyEvent
, nsIRunnable
,
102 nsIInputStreamCallback
)
104 //-----------------------------------------------------------------------------
106 class nsOutputStreamReadyEvent MOZ_FINAL
108 , public nsIOutputStreamCallback
111 NS_DECL_THREADSAFE_ISUPPORTS
113 nsOutputStreamReadyEvent(nsIOutputStreamCallback
* aCallback
,
114 nsIEventTarget
* aTarget
)
115 : mCallback(aCallback
)
121 ~nsOutputStreamReadyEvent()
127 // whoa!! looks like we never posted this event. take care to
128 // release mCallback on the correct thread. if mTarget lives on the
129 // calling thread, then we are ok. otherwise, we have to try to
130 // proxy the Release over the right thread. if that thread is dead,
131 // then there's nothing we can do... better to leak than crash.
134 nsresult rv
= mTarget
->IsOnCurrentThread(&val
);
135 if (NS_FAILED(rv
) || !val
) {
136 nsCOMPtr
<nsIOutputStreamCallback
> event
=
137 NS_NewOutputStreamReadyEvent(mCallback
, mTarget
);
140 rv
= event
->OnOutputStreamReady(nullptr);
142 NS_NOTREACHED("leaking stream event");
143 nsISupports
* sup
= event
;
151 NS_IMETHOD
OnOutputStreamReady(nsIAsyncOutputStream
* aStream
)
156 mTarget
->Dispatch(this, NS_DISPATCH_NORMAL
);
158 NS_WARNING("PostEvent failed");
159 return NS_ERROR_FAILURE
;
169 mCallback
->OnOutputStreamReady(mStream
);
177 nsCOMPtr
<nsIAsyncOutputStream
> mStream
;
178 nsCOMPtr
<nsIOutputStreamCallback
> mCallback
;
179 nsCOMPtr
<nsIEventTarget
> mTarget
;
182 NS_IMPL_ISUPPORTS(nsOutputStreamReadyEvent
, nsIRunnable
,
183 nsIOutputStreamCallback
)
185 //-----------------------------------------------------------------------------
187 already_AddRefed
<nsIInputStreamCallback
>
188 NS_NewInputStreamReadyEvent(nsIInputStreamCallback
* aCallback
,
189 nsIEventTarget
* aTarget
)
191 NS_ASSERTION(aCallback
, "null callback");
192 NS_ASSERTION(aTarget
, "null target");
193 nsRefPtr
<nsInputStreamReadyEvent
> ev
=
194 new nsInputStreamReadyEvent(aCallback
, aTarget
);
198 already_AddRefed
<nsIOutputStreamCallback
>
199 NS_NewOutputStreamReadyEvent(nsIOutputStreamCallback
* aCallback
,
200 nsIEventTarget
* aTarget
)
202 NS_ASSERTION(aCallback
, "null callback");
203 NS_ASSERTION(aTarget
, "null target");
204 nsRefPtr
<nsOutputStreamReadyEvent
> ev
=
205 new nsOutputStreamReadyEvent(aCallback
, aTarget
);
209 //-----------------------------------------------------------------------------
210 // NS_AsyncCopy implementation
212 // abstract stream copier...
213 class nsAStreamCopier
214 : public nsIInputStreamCallback
215 , public nsIOutputStreamCallback
219 NS_DECL_THREADSAFE_ISUPPORTS
222 : mLock("nsAStreamCopier.mLock")
224 , mProgressCallback(nullptr)
227 , mEventInProcess(false)
228 , mEventIsPending(false)
232 , mCancelStatus(NS_OK
)
236 // kick off the async copy...
237 nsresult
Start(nsIInputStream
* aSource
,
238 nsIOutputStream
* aSink
,
239 nsIEventTarget
* aTarget
,
240 nsAsyncCopyCallbackFun aCallback
,
245 nsAsyncCopyProgressFun aProgressCallback
)
250 mCallback
= aCallback
;
252 mChunkSize
= aChunksize
;
253 mCloseSource
= aCloseSource
;
254 mCloseSink
= aCloseSink
;
255 mProgressCallback
= aProgressCallback
;
257 mAsyncSource
= do_QueryInterface(mSource
);
258 mAsyncSink
= do_QueryInterface(mSink
);
260 return PostContinuationEvent();
263 // implemented by subclasses, returns number of bytes copied and
264 // sets source and sink condition before returning.
265 virtual uint32_t DoCopy(nsresult
* aSourceCondition
,
266 nsresult
* aSinkCondition
) = 0;
270 if (!mSource
|| !mSink
) {
274 nsresult sourceCondition
, sinkCondition
;
275 nsresult cancelStatus
;
278 MutexAutoLock
lock(mLock
);
279 canceled
= mCanceled
;
280 cancelStatus
= mCancelStatus
;
283 // Copy data from the source to the sink until we hit failure or have
284 // copied all the data.
286 // Note: copyFailed will be true if the source or the sink have
287 // reported an error, or if we failed to write any bytes
288 // because we have consumed all of our data.
289 bool copyFailed
= false;
291 uint32_t n
= DoCopy(&sourceCondition
, &sinkCondition
);
292 if (n
> 0 && mProgressCallback
) {
293 mProgressCallback(mClosure
, n
);
295 copyFailed
= NS_FAILED(sourceCondition
) ||
296 NS_FAILED(sinkCondition
) || n
== 0;
298 MutexAutoLock
lock(mLock
);
299 canceled
= mCanceled
;
300 cancelStatus
= mCancelStatus
;
302 if (copyFailed
&& !canceled
) {
303 if (sourceCondition
== NS_BASE_STREAM_WOULD_BLOCK
&& mAsyncSource
) {
304 // need to wait for more data from source. while waiting for
305 // more source data, be sure to observe failures on output end.
306 mAsyncSource
->AsyncWait(this, 0, 0, nullptr);
309 mAsyncSink
->AsyncWait(this,
310 nsIAsyncOutputStream::WAIT_CLOSURE_ONLY
,
313 } else if (sinkCondition
== NS_BASE_STREAM_WOULD_BLOCK
&& mAsyncSink
) {
314 // need to wait for more room in the sink. while waiting for
315 // more room in the sink, be sure to observer failures on the
317 mAsyncSink
->AsyncWait(this, 0, 0, nullptr);
320 mAsyncSource
->AsyncWait(this,
321 nsIAsyncInputStream::WAIT_CLOSURE_ONLY
,
326 if (copyFailed
|| canceled
) {
330 mAsyncSource
->CloseWithStatus(
331 canceled
? cancelStatus
: sinkCondition
);
336 mAsyncSource
= nullptr;
342 mAsyncSink
->CloseWithStatus(
343 canceled
? cancelStatus
: sourceCondition
);
345 // If we have an nsISafeOutputStream, and our
346 // sourceCondition and sinkCondition are not set to a
347 // failure state, finish writing.
348 nsCOMPtr
<nsISafeOutputStream
> sostream
=
349 do_QueryInterface(mSink
);
350 if (sostream
&& NS_SUCCEEDED(sourceCondition
) &&
351 NS_SUCCEEDED(sinkCondition
)) {
358 mAsyncSink
= nullptr;
361 // notify state complete...
365 status
= sourceCondition
;
366 if (NS_SUCCEEDED(status
)) {
367 status
= sinkCondition
;
369 if (status
== NS_BASE_STREAM_CLOSED
) {
373 status
= cancelStatus
;
375 mCallback(mClosure
, status
);
382 nsresult
Cancel(nsresult aReason
)
384 MutexAutoLock
lock(mLock
);
386 return NS_ERROR_FAILURE
;
389 if (NS_SUCCEEDED(aReason
)) {
390 NS_WARNING("cancel with non-failure status code");
391 aReason
= NS_BASE_STREAM_CLOSED
;
395 mCancelStatus
= aReason
;
399 NS_IMETHOD
OnInputStreamReady(nsIAsyncInputStream
* aSource
)
401 PostContinuationEvent();
405 NS_IMETHOD
OnOutputStreamReady(nsIAsyncOutputStream
* aSink
)
407 PostContinuationEvent();
411 // continuation event handler
416 // clear "in process" flag and post any pending continuation event
417 MutexAutoLock
lock(mLock
);
418 mEventInProcess
= false;
419 if (mEventIsPending
) {
420 mEventIsPending
= false;
421 PostContinuationEvent_Locked();
427 nsresult
PostContinuationEvent()
429 // we cannot post a continuation event if there is currently
430 // an event in process. doing so could result in Process being
431 // run simultaneously on multiple threads, so we mark the event
432 // as pending, and if an event is already in process then we
433 // just let that existing event take care of posting the real
434 // continuation event.
436 MutexAutoLock
lock(mLock
);
437 return PostContinuationEvent_Locked();
440 nsresult
PostContinuationEvent_Locked()
443 if (mEventInProcess
) {
444 mEventIsPending
= true;
446 rv
= mTarget
->Dispatch(this, NS_DISPATCH_NORMAL
);
447 if (NS_SUCCEEDED(rv
)) {
448 mEventInProcess
= true;
450 NS_WARNING("unable to post continuation event");
457 nsCOMPtr
<nsIInputStream
> mSource
;
458 nsCOMPtr
<nsIOutputStream
> mSink
;
459 nsCOMPtr
<nsIAsyncInputStream
> mAsyncSource
;
460 nsCOMPtr
<nsIAsyncOutputStream
> mAsyncSink
;
461 nsCOMPtr
<nsIEventTarget
> mTarget
;
463 nsAsyncCopyCallbackFun mCallback
;
464 nsAsyncCopyProgressFun mProgressCallback
;
467 bool mEventInProcess
;
468 bool mEventIsPending
;
472 nsresult mCancelStatus
;
474 // virtual since subclasses call superclass Release()
475 virtual ~nsAStreamCopier()
480 NS_IMPL_ISUPPORTS(nsAStreamCopier
,
481 nsIInputStreamCallback
,
482 nsIOutputStreamCallback
,
485 class nsStreamCopierIB MOZ_FINAL
: public nsAStreamCopier
488 nsStreamCopierIB() : nsAStreamCopier()
491 virtual ~nsStreamCopierIB()
495 struct ReadSegmentsState
497 nsIOutputStream
* mSink
;
498 nsresult mSinkCondition
;
501 static NS_METHOD
ConsumeInputBuffer(nsIInputStream
* aInStr
,
506 uint32_t* aCountWritten
)
508 ReadSegmentsState
* state
= (ReadSegmentsState
*)aClosure
;
510 nsresult rv
= state
->mSink
->Write(aBuffer
, aCount
, aCountWritten
);
512 state
->mSinkCondition
= rv
;
513 } else if (*aCountWritten
== 0) {
514 state
->mSinkCondition
= NS_BASE_STREAM_CLOSED
;
517 return state
->mSinkCondition
;
520 uint32_t DoCopy(nsresult
* aSourceCondition
, nsresult
* aSinkCondition
)
522 ReadSegmentsState state
;
524 state
.mSinkCondition
= NS_OK
;
528 mSource
->ReadSegments(ConsumeInputBuffer
, &state
, mChunkSize
, &n
);
529 *aSinkCondition
= state
.mSinkCondition
;
534 class nsStreamCopierOB MOZ_FINAL
: public nsAStreamCopier
537 nsStreamCopierOB() : nsAStreamCopier()
540 virtual ~nsStreamCopierOB()
544 struct WriteSegmentsState
546 nsIInputStream
* mSource
;
547 nsresult mSourceCondition
;
550 static NS_METHOD
FillOutputBuffer(nsIOutputStream
* aOutStr
,
555 uint32_t* aCountRead
)
557 WriteSegmentsState
* state
= (WriteSegmentsState
*)aClosure
;
559 nsresult rv
= state
->mSource
->Read(aBuffer
, aCount
, aCountRead
);
561 state
->mSourceCondition
= rv
;
562 } else if (*aCountRead
== 0) {
563 state
->mSourceCondition
= NS_BASE_STREAM_CLOSED
;
566 return state
->mSourceCondition
;
569 uint32_t DoCopy(nsresult
* aSourceCondition
, nsresult
* aSinkCondition
)
571 WriteSegmentsState state
;
572 state
.mSource
= mSource
;
573 state
.mSourceCondition
= NS_OK
;
577 mSink
->WriteSegments(FillOutputBuffer
, &state
, mChunkSize
, &n
);
578 *aSourceCondition
= state
.mSourceCondition
;
583 //-----------------------------------------------------------------------------
586 NS_AsyncCopy(nsIInputStream
* aSource
,
587 nsIOutputStream
* aSink
,
588 nsIEventTarget
* aTarget
,
589 nsAsyncCopyMode aMode
,
591 nsAsyncCopyCallbackFun aCallback
,
595 nsISupports
** aCopierCtx
,
596 nsAsyncCopyProgressFun aProgressCallback
)
598 NS_ASSERTION(aTarget
, "non-null target required");
601 nsAStreamCopier
* copier
;
603 if (aMode
== NS_ASYNCCOPY_VIA_READSEGMENTS
) {
604 copier
= new nsStreamCopierIB();
606 copier
= new nsStreamCopierOB();
610 return NS_ERROR_OUT_OF_MEMORY
;
613 // Start() takes an owning ref to the copier...
615 rv
= copier
->Start(aSource
, aSink
, aTarget
, aCallback
, aClosure
, aChunkSize
,
616 aCloseSource
, aCloseSink
, aProgressCallback
);
619 *aCopierCtx
= static_cast<nsISupports
*>(static_cast<nsIRunnable
*>(copier
));
620 NS_ADDREF(*aCopierCtx
);
627 //-----------------------------------------------------------------------------
630 NS_CancelAsyncCopy(nsISupports
* aCopierCtx
, nsresult aReason
)
632 nsAStreamCopier
* copier
=
633 static_cast<nsAStreamCopier
*>(static_cast<nsIRunnable
*>(aCopierCtx
));
634 return copier
->Cancel(aReason
);
637 //-----------------------------------------------------------------------------
640 NS_ConsumeStream(nsIInputStream
* aStream
, uint32_t aMaxCount
,
648 rv
= aStream
->Available(&avail64
);
650 if (rv
== NS_BASE_STREAM_CLOSED
) {
659 uint32_t avail
= (uint32_t)XPCOM_MIN
<uint64_t>(avail64
, aMaxCount
);
661 // resize aResult buffer
662 uint32_t length
= aResult
.Length();
663 if (avail
> UINT32_MAX
- length
) {
664 return NS_ERROR_FILE_TOO_BIG
;
667 aResult
.SetLength(length
+ avail
);
668 if (aResult
.Length() != (length
+ avail
)) {
669 return NS_ERROR_OUT_OF_MEMORY
;
671 char* buf
= aResult
.BeginWriting() + length
;
674 rv
= aStream
->Read(buf
, avail
, &n
);
679 aResult
.SetLength(length
+ n
);
690 //-----------------------------------------------------------------------------
693 TestInputStream(nsIInputStream
* aInStr
,
698 uint32_t* aCountWritten
)
700 bool* result
= static_cast<bool*>(aClosure
);
702 return NS_ERROR_ABORT
; // don't call me anymore
706 NS_InputStreamIsBuffered(nsIInputStream
* aStream
)
708 nsCOMPtr
<nsIBufferedInputStream
> bufferedIn
= do_QueryInterface(aStream
);
715 nsresult rv
= aStream
->ReadSegments(TestInputStream
, &result
, 1, &n
);
716 return result
|| NS_SUCCEEDED(rv
);
720 TestOutputStream(nsIOutputStream
* aOutStr
,
725 uint32_t* aCountRead
)
727 bool* result
= static_cast<bool*>(aClosure
);
729 return NS_ERROR_ABORT
; // don't call me anymore
733 NS_OutputStreamIsBuffered(nsIOutputStream
* aStream
)
735 nsCOMPtr
<nsIBufferedOutputStream
> bufferedOut
= do_QueryInterface(aStream
);
742 aStream
->WriteSegments(TestOutputStream
, &result
, 1, &n
);
746 //-----------------------------------------------------------------------------
749 NS_CopySegmentToStream(nsIInputStream
* aInStr
,
754 uint32_t* aCountWritten
)
756 nsIOutputStream
* outStr
= static_cast<nsIOutputStream
*>(aClosure
);
760 nsresult rv
= outStr
->Write(aBuffer
, aCount
, &n
);
772 NS_CopySegmentToBuffer(nsIInputStream
* aInStr
,
777 uint32_t* aCountWritten
)
779 char* toBuf
= static_cast<char*>(aClosure
);
780 memcpy(&toBuf
[aOffset
], aBuffer
, aCount
);
781 *aCountWritten
= aCount
;
786 NS_CopySegmentToBuffer(nsIOutputStream
* aOutStr
,
791 uint32_t* aCountRead
)
793 const char* fromBuf
= static_cast<const char*>(aClosure
);
794 memcpy(aBuffer
, &fromBuf
[aOffset
], aCount
);
795 *aCountRead
= aCount
;
800 NS_DiscardSegment(nsIInputStream
* aInStr
,
805 uint32_t* aCountWritten
)
807 *aCountWritten
= aCount
;
811 //-----------------------------------------------------------------------------
814 NS_WriteSegmentThunk(nsIInputStream
* aInStr
,
819 uint32_t* aCountWritten
)
821 nsWriteSegmentThunk
* thunk
= static_cast<nsWriteSegmentThunk
*>(aClosure
);
822 return thunk
->mFun(thunk
->mStream
, thunk
->mClosure
, aBuffer
, aOffset
, aCount
,
827 NS_FillArray(FallibleTArray
<char>& aDest
, nsIInputStream
* aInput
,
828 uint32_t aKeep
, uint32_t* aNewBytes
)
830 MOZ_ASSERT(aInput
, "null stream");
831 MOZ_ASSERT(aKeep
<= aDest
.Length(), "illegal keep count");
833 char* aBuffer
= aDest
.Elements();
834 int64_t keepOffset
= int64_t(aDest
.Length()) - aKeep
;
835 if (aKeep
!= 0 && keepOffset
> 0) {
836 memmove(aBuffer
, aBuffer
+ keepOffset
, aKeep
);
840 aInput
->Read(aBuffer
+ aKeep
, aDest
.Capacity() - aKeep
, aNewBytes
);
844 // NOTE: we rely on the fact that the new slots are NOT initialized by
845 // SetLengthAndRetainStorage here, see nsTArrayElementTraits::Construct()
847 aDest
.SetLengthAndRetainStorage(aKeep
+ *aNewBytes
);
849 MOZ_ASSERT(aDest
.Length() <= aDest
.Capacity(), "buffer overflow");