1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 #include "nsBufferedStreams.h"
7 #include "nsStreamUtils.h"
9 #include "nsIClassInfoImpl.h"
10 #include "nsIEventTarget.h"
11 #include "nsThreadUtils.h"
12 #include "mozilla/DebugOnly.h"
13 #include "mozilla/ipc/InputStreamUtils.h"
23 # define MAX_BIG_SEEKS 20
26 uint32_t mSeeksWithinBuffer
;
27 uint32_t mSeeksOutsideBuffer
;
28 uint32_t mBufferReadUponSeek
;
29 uint32_t mBufferUnreadUponSeek
;
30 uint32_t mBytesReadFromBuffer
;
31 uint32_t mBigSeekIndex
;
35 } mBigSeek
[MAX_BIG_SEEKS
];
38 # define METER(x) /* nothing */
41 using namespace mozilla::ipc
;
42 using namespace mozilla
;
44 ////////////////////////////////////////////////////////////////////////////////
47 nsBufferedStream::~nsBufferedStream() { Close(); }
49 NS_IMPL_ADDREF(nsBufferedStream
)
50 NS_IMPL_RELEASE(nsBufferedStream
)
52 NS_INTERFACE_MAP_BEGIN(nsBufferedStream
)
53 NS_INTERFACE_MAP_ENTRY(nsISupports
)
54 NS_INTERFACE_MAP_ENTRY(nsITellableStream
)
55 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream
, mSeekable
)
58 nsresult
nsBufferedStream::Init(nsISupports
* aStream
, uint32_t bufferSize
) {
59 NS_ASSERTION(aStream
, "need to supply a stream");
60 NS_ASSERTION(mStream
== nullptr, "already inited");
61 mStream
= aStream
; // we keep a reference until nsBufferedStream::Close
62 mBufferSize
= bufferSize
;
63 mBufferStartOffset
= 0;
65 nsCOMPtr
<nsISeekableStream
> seekable
= do_QueryInterface(mStream
);
67 RecursiveMutexAutoLock
lock(mBufferMutex
);
68 mBuffer
= new (mozilla::fallible
) char[bufferSize
];
69 if (mBuffer
== nullptr) {
70 return NS_ERROR_OUT_OF_MEMORY
;
75 void nsBufferedStream::Close() {
76 // Drop the reference from nsBufferedStream::Init()
78 RecursiveMutexAutoLock
lock(mBufferMutex
);
83 mBufferStartOffset
= 0;
91 tfp
= fopen("/tmp/bufstats", "w");
93 setvbuf(tfp
, nullptr, _IOLBF
, 0);
97 fprintf(tfp
, "seeks within buffer: %u\n", bufstats
.mSeeksWithinBuffer
);
98 fprintf(tfp
, "seeks outside buffer: %u\n",
99 bufstats
.mSeeksOutsideBuffer
);
100 fprintf(tfp
, "buffer read on seek: %u\n",
101 bufstats
.mBufferReadUponSeek
);
102 fprintf(tfp
, "buffer unread on seek: %u\n",
103 bufstats
.mBufferUnreadUponSeek
);
104 fprintf(tfp
, "bytes read from buffer: %u\n",
105 bufstats
.mBytesReadFromBuffer
);
106 for (uint32_t i
= 0; i
< bufstats
.mBigSeekIndex
; i
++) {
107 fprintf(tfp
, "bigseek[%u] = {old: %u, new: %u}\n", i
,
108 bufstats
.mBigSeek
[i
].mOldOffset
,
109 bufstats
.mBigSeek
[i
].mNewOffset
);
117 nsBufferedStream::Seek(int32_t whence
, int64_t offset
) {
118 if (mStream
== nullptr) {
119 return NS_BASE_STREAM_CLOSED
;
122 // If the underlying stream isn't a random access store, then fail early.
123 // We could possibly succeed for the case where the seek position denotes
124 // something that happens to be read into the buffer, but that would make
125 // the failure data-dependent.
127 nsCOMPtr
<nsISeekableStream
> ras
= do_QueryInterface(mStream
, &rv
);
129 NS_WARNING("mStream doesn't QI to nsISeekableStream");
135 case nsISeekableStream::NS_SEEK_SET
:
138 case nsISeekableStream::NS_SEEK_CUR
:
139 absPos
= mBufferStartOffset
;
143 case nsISeekableStream::NS_SEEK_END
:
147 MOZ_ASSERT_UNREACHABLE("bogus seek whence parameter");
148 return NS_ERROR_UNEXPECTED
;
151 // Let mCursor point into the existing buffer if the new position is
152 // between the current cursor and the mFillPoint "fencepost" -- the
153 // client may never get around to a Read or Write after this Seek.
154 // Read and Write worry about flushing and filling in that event.
155 // But if we're at EOF, make sure to pass the seek through to the
156 // underlying stream, because it may have auto-closed itself and
158 uint32_t offsetInBuffer
= uint32_t(absPos
- mBufferStartOffset
);
159 if (offsetInBuffer
<= mFillPoint
&& !mEOF
) {
160 METER(bufstats
.mSeeksWithinBuffer
++);
161 mCursor
= offsetInBuffer
;
165 METER(bufstats
.mSeeksOutsideBuffer
++);
166 METER(bufstats
.mBufferReadUponSeek
+= mCursor
);
167 METER(bufstats
.mBufferUnreadUponSeek
+= mFillPoint
- mCursor
);
172 "(debug) Flush returned error within nsBufferedStream::Seek, so we "
178 rv
= ras
->Seek(whence
, offset
);
182 "(debug) Error: ras->Seek() returned error within "
183 "nsBufferedStream::Seek, so we exit early.");
190 // Recompute whether the offset we're seeking to is in our buffer.
191 // Note that we need to recompute because Flush() might have
192 // changed mBufferStartOffset.
193 offsetInBuffer
= uint32_t(absPos
- mBufferStartOffset
);
194 if (offsetInBuffer
<= mFillPoint
) {
195 // It's safe to just set mCursor to offsetInBuffer. In particular, we
196 // want to avoid calling Fill() here since we already have the data that
197 // was seeked to and calling Fill() might auto-close our underlying
198 // stream in some cases.
199 mCursor
= offsetInBuffer
;
203 METER(if (bufstats
.mBigSeekIndex
< MAX_BIG_SEEKS
)
204 bufstats
.mBigSeek
[bufstats
.mBigSeekIndex
]
205 .mOldOffset
= mBufferStartOffset
+ int64_t(mCursor
));
206 const int64_t minus1
= -1;
207 if (absPos
== minus1
) {
208 // then we had the SEEK_END case, above
210 rv
= ras
->Tell(&tellPos
);
211 mBufferStartOffset
= tellPos
;
216 mBufferStartOffset
= absPos
;
218 METER(if (bufstats
.mBigSeekIndex
< MAX_BIG_SEEKS
)
219 bufstats
.mBigSeek
[bufstats
.mBigSeekIndex
++]
220 .mNewOffset
= mBufferStartOffset
);
222 mFillPoint
= mCursor
= 0;
224 // If we seeked back to the start, then don't fill the buffer
225 // right now in case this is a lazily-opened file stream.
226 // We'll fill on the first read, like we did initially.
227 if (whence
== nsISeekableStream::NS_SEEK_SET
&& offset
== 0) {
234 nsBufferedStream::Tell(int64_t* result
) {
235 if (mStream
== nullptr) {
236 return NS_BASE_STREAM_CLOSED
;
239 int64_t result64
= mBufferStartOffset
;
246 nsBufferedStream::SetEOF() {
247 if (mStream
== nullptr) {
248 return NS_BASE_STREAM_CLOSED
;
252 nsCOMPtr
<nsISeekableStream
> ras
= do_QueryInterface(mStream
, &rv
);
258 if (NS_SUCCEEDED(rv
)) {
265 nsresult
nsBufferedStream::GetData(nsISupports
** aResult
) {
266 nsCOMPtr
<nsISupports
> stream(mStream
);
267 stream
.forget(aResult
);
271 ////////////////////////////////////////////////////////////////////////////////
272 // nsBufferedInputStream
274 NS_IMPL_ADDREF_INHERITED(nsBufferedInputStream
, nsBufferedStream
)
275 NS_IMPL_RELEASE_INHERITED(nsBufferedInputStream
, nsBufferedStream
)
277 NS_IMPL_CLASSINFO(nsBufferedInputStream
, nullptr, nsIClassInfo::THREADSAFE
,
278 NS_BUFFEREDINPUTSTREAM_CID
)
280 NS_INTERFACE_MAP_BEGIN(nsBufferedInputStream
)
281 // Unfortunately there isn't a macro that combines ambiguous and conditional,
282 // and as far as I can tell, no other class would need such a macro.
283 if (mIsAsyncInputStream
&& aIID
.Equals(NS_GET_IID(nsIInputStream
))) {
285 static_cast<nsIInputStream
*>(static_cast<nsIAsyncInputStream
*>(this));
286 } else if (!mIsAsyncInputStream
&& aIID
.Equals(NS_GET_IID(nsIInputStream
))) {
287 foundInterface
= static_cast<nsIInputStream
*>(
288 static_cast<nsIBufferedInputStream
*>(this));
290 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports
, nsIBufferedInputStream
)
291 NS_INTERFACE_MAP_ENTRY(nsIBufferedInputStream
)
292 NS_INTERFACE_MAP_ENTRY(nsIStreamBufferAccess
)
293 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream
,
295 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream
, mIsAsyncInputStream
)
296 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback
,
298 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream
,
299 mIsCloneableInputStream
)
300 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamLength
, mIsInputStreamLength
)
301 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStreamLength
,
302 mIsAsyncInputStreamLength
)
303 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamLengthCallback
,
304 mIsAsyncInputStreamLength
)
305 NS_IMPL_QUERY_CLASSINFO(nsBufferedInputStream
)
306 NS_INTERFACE_MAP_END_INHERITING(nsBufferedStream
)
308 NS_IMPL_CI_INTERFACE_GETTER(nsBufferedInputStream
, nsIInputStream
,
309 nsIBufferedInputStream
, nsISeekableStream
,
310 nsITellableStream
, nsIStreamBufferAccess
)
312 nsresult
nsBufferedInputStream::Create(REFNSIID aIID
, void** aResult
) {
313 RefPtr
<nsBufferedInputStream
> stream
= new nsBufferedInputStream();
314 return stream
->QueryInterface(aIID
, aResult
);
318 nsBufferedInputStream::Init(nsIInputStream
* stream
, uint32_t bufferSize
) {
319 nsresult rv
= nsBufferedStream::Init(stream
, bufferSize
);
320 NS_ENSURE_SUCCESS(rv
, rv
);
323 nsCOMPtr
<nsIIPCSerializableInputStream
> stream
= do_QueryInterface(mStream
);
324 mIsIPCSerializable
= !!stream
;
328 nsCOMPtr
<nsIAsyncInputStream
> stream
= do_QueryInterface(mStream
);
329 mIsAsyncInputStream
= !!stream
;
333 nsCOMPtr
<nsICloneableInputStream
> stream
= do_QueryInterface(mStream
);
334 mIsCloneableInputStream
= !!stream
;
338 nsCOMPtr
<nsIInputStreamLength
> stream
= do_QueryInterface(mStream
);
339 mIsInputStreamLength
= !!stream
;
343 nsCOMPtr
<nsIAsyncInputStreamLength
> stream
= do_QueryInterface(mStream
);
344 mIsAsyncInputStreamLength
= !!stream
;
350 already_AddRefed
<nsIInputStream
> nsBufferedInputStream::GetInputStream() {
351 // A non-null mStream implies Init() has been called.
354 nsIInputStream
* out
= nullptr;
355 DebugOnly
<nsresult
> rv
= QueryInterface(NS_GET_IID(nsIInputStream
),
356 reinterpret_cast<void**>(&out
));
357 MOZ_ASSERT(NS_SUCCEEDED(rv
));
360 return already_AddRefed
<nsIInputStream
>(out
);
364 nsBufferedInputStream::Close() {
367 rv
= Source()->Close();
370 "(debug) Error: Source()->Close() returned error in "
371 "bsBuffedInputStream::Close().");
375 nsBufferedStream::Close();
380 nsBufferedInputStream::Available(uint64_t* result
) {
387 uint64_t avail
= mFillPoint
- mCursor
;
390 nsresult rv
= Source()->Available(&tmp
);
391 if (NS_SUCCEEDED(rv
)) {
404 nsBufferedInputStream::StreamStatus() {
409 if (mFillPoint
- mCursor
) {
413 return Source()->StreamStatus();
417 nsBufferedInputStream::Read(char* buf
, uint32_t count
, uint32_t* result
) {
418 if (mBufferDisabled
) {
423 nsresult rv
= Source()->Read(buf
, count
, result
);
424 if (NS_SUCCEEDED(rv
)) {
425 mBufferStartOffset
+= *result
; // so nsBufferedStream::Tell works
433 return ReadSegments(NS_CopySegmentToBuffer
, buf
, count
, result
);
437 nsBufferedInputStream::ReadSegments(nsWriteSegmentFun writer
, void* closure
,
438 uint32_t count
, uint32_t* result
) {
446 RecursiveMutexAutoLock
lock(mBufferMutex
);
448 uint32_t amt
= std::min(count
, mFillPoint
- mCursor
);
451 rv
= writer(static_cast<nsIBufferedInputStream
*>(this), closure
,
452 mBuffer
+ mCursor
, *result
, amt
, &read
);
454 // errors returned from the writer end here!
463 if (rv
== NS_BASE_STREAM_WOULD_BLOCK
) {
469 if (mFillPoint
== mCursor
) {
474 return (*result
> 0) ? NS_OK
: rv
;
478 nsBufferedInputStream::IsNonBlocking(bool* aNonBlocking
) {
480 return Source()->IsNonBlocking(aNonBlocking
);
482 return NS_ERROR_NOT_INITIALIZED
;
486 nsBufferedInputStream::Fill() {
487 if (mBufferDisabled
) {
490 NS_ENSURE_TRUE(mStream
, NS_ERROR_NOT_INITIALIZED
);
492 RecursiveMutexAutoLock
lock(mBufferMutex
);
495 int32_t rem
= int32_t(mFillPoint
- mCursor
);
497 // slide the remainder down to the start of the buffer
498 // |<------------->|<--rem-->|<--->|
500 memcpy(mBuffer
, mBuffer
+ mCursor
, rem
);
502 mBufferStartOffset
+= mCursor
;
507 rv
= Source()->Read(mBuffer
+ mFillPoint
, mBufferSize
- mFillPoint
, &amt
);
520 NS_IMETHODIMP_(char*)
521 nsBufferedInputStream::GetBuffer(uint32_t aLength
, uint32_t aAlignMask
) {
522 NS_ASSERTION(mGetBufferCount
== 0, "nested GetBuffer!");
523 if (mGetBufferCount
!= 0) {
527 if (mBufferDisabled
) {
531 RecursiveMutexAutoLock
lock(mBufferMutex
);
532 char* buf
= mBuffer
+ mCursor
;
533 uint32_t rem
= mFillPoint
- mCursor
;
535 if (NS_FAILED(Fill())) {
538 buf
= mBuffer
+ mCursor
;
539 rem
= mFillPoint
- mCursor
;
542 uint32_t mod
= (NS_PTR_TO_INT32(buf
) & aAlignMask
);
544 uint32_t pad
= aAlignMask
+ 1 - mod
;
563 nsBufferedInputStream::PutBuffer(char* aBuffer
, uint32_t aLength
) {
564 NS_ASSERTION(mGetBufferCount
== 1, "stray PutBuffer!");
565 if (--mGetBufferCount
!= 0) {
569 NS_ASSERTION(mCursor
+ aLength
<= mFillPoint
, "PutBuffer botch");
574 nsBufferedInputStream::DisableBuffering() {
575 NS_ASSERTION(!mBufferDisabled
, "redundant call to DisableBuffering!");
576 NS_ASSERTION(mGetBufferCount
== 0,
577 "DisableBuffer call between GetBuffer and PutBuffer!");
578 if (mGetBufferCount
!= 0) {
579 return NS_ERROR_UNEXPECTED
;
582 // Empty the buffer so nsBufferedStream::Tell works.
583 mBufferStartOffset
+= mCursor
;
584 mFillPoint
= mCursor
= 0;
585 mBufferDisabled
= true;
590 nsBufferedInputStream::EnableBuffering() {
591 NS_ASSERTION(mBufferDisabled
, "gratuitous call to EnableBuffering!");
592 mBufferDisabled
= false;
597 nsBufferedInputStream::GetUnbufferedStream(nsISupports
** aStream
) {
598 // Empty the buffer so subsequent i/o trumps any buffered data.
599 mBufferStartOffset
+= mCursor
;
600 mFillPoint
= mCursor
= 0;
602 nsCOMPtr
<nsISupports
> stream
= mStream
;
603 stream
.forget(aStream
);
607 void nsBufferedInputStream::SerializedComplexity(uint32_t aMaxSize
,
610 uint32_t* aTransferables
) {
612 nsCOMPtr
<nsIInputStream
> stream
= do_QueryInterface(mStream
);
615 InputStreamHelper::SerializedComplexity(stream
, aMaxSize
, aSizeUsed
, aPipes
,
620 void nsBufferedInputStream::Serialize(InputStreamParams
& aParams
,
621 uint32_t aMaxSize
, uint32_t* aSizeUsed
) {
622 MOZ_ASSERT(aSizeUsed
);
625 BufferedInputStreamParams params
;
628 nsCOMPtr
<nsIInputStream
> stream
= do_QueryInterface(mStream
);
631 InputStreamParams wrappedParams
;
632 InputStreamHelper::SerializeInputStream(stream
, wrappedParams
, aMaxSize
,
635 params
.optionalStream().emplace(wrappedParams
);
638 params
.bufferSize() = mBufferSize
;
643 bool nsBufferedInputStream::Deserialize(const InputStreamParams
& aParams
) {
644 if (aParams
.type() != InputStreamParams::TBufferedInputStreamParams
) {
645 NS_ERROR("Received unknown parameters from the other process!");
649 const BufferedInputStreamParams
& params
=
650 aParams
.get_BufferedInputStreamParams();
651 const Maybe
<InputStreamParams
>& wrappedParams
= params
.optionalStream();
653 nsCOMPtr
<nsIInputStream
> stream
;
654 if (wrappedParams
.isSome()) {
655 stream
= InputStreamHelper::DeserializeInputStream(wrappedParams
.ref());
657 NS_WARNING("Failed to deserialize wrapped stream!");
662 nsresult rv
= Init(stream
, params
.bufferSize());
663 NS_ENSURE_SUCCESS(rv
, false);
669 nsBufferedInputStream::CloseWithStatus(nsresult aStatus
) { return Close(); }
672 nsBufferedInputStream::AsyncWait(nsIInputStreamCallback
* aCallback
,
673 uint32_t aFlags
, uint32_t aRequestedCount
,
674 nsIEventTarget
* aEventTarget
) {
675 nsCOMPtr
<nsIAsyncInputStream
> stream
= do_QueryInterface(mStream
);
677 // Stream is probably closed. Callback, if not nullptr, can be executed
684 nsCOMPtr
<nsIInputStreamCallback
> callable
= NS_NewInputStreamReadyEvent(
685 "nsBufferedInputStream::OnInputStreamReady", aCallback
, aEventTarget
);
686 return callable
->OnInputStreamReady(this);
689 aCallback
->OnInputStreamReady(this);
693 nsCOMPtr
<nsIInputStreamCallback
> callback
= aCallback
? this : nullptr;
695 MutexAutoLock
lock(mMutex
);
697 if (NS_WARN_IF(mAsyncWaitCallback
&& aCallback
&&
698 mAsyncWaitCallback
!= aCallback
)) {
699 return NS_ERROR_FAILURE
;
702 mAsyncWaitCallback
= aCallback
;
705 return stream
->AsyncWait(callback
, aFlags
, aRequestedCount
, aEventTarget
);
709 nsBufferedInputStream::OnInputStreamReady(nsIAsyncInputStream
* aStream
) {
710 nsCOMPtr
<nsIInputStreamCallback
> callback
;
712 MutexAutoLock
lock(mMutex
);
714 // We have been canceled in the meanwhile.
715 if (!mAsyncWaitCallback
) {
719 callback
.swap(mAsyncWaitCallback
);
722 MOZ_ASSERT(callback
);
723 return callback
->OnInputStreamReady(this);
727 nsBufferedInputStream::GetData(nsIInputStream
** aResult
) {
728 nsCOMPtr
<nsISupports
> stream
;
729 nsBufferedStream::GetData(getter_AddRefs(stream
));
730 nsCOMPtr
<nsIInputStream
> inputStream
= do_QueryInterface(stream
);
731 inputStream
.forget(aResult
);
735 // nsICloneableInputStream interface
738 nsBufferedInputStream::GetCloneable(bool* aCloneable
) {
741 RecursiveMutexAutoLock
lock(mBufferMutex
);
743 // If we don't have the buffer, the inputStream has been already closed.
744 // If mBufferStartOffset is not 0, the stream has been seeked or read.
745 // In both case the cloning is not supported.
746 if (!mBuffer
|| mBufferStartOffset
) {
750 nsCOMPtr
<nsICloneableInputStream
> stream
= do_QueryInterface(mStream
);
752 // GetCloneable is infallible.
753 NS_ENSURE_TRUE(stream
, NS_OK
);
755 return stream
->GetCloneable(aCloneable
);
759 nsBufferedInputStream::Clone(nsIInputStream
** aResult
) {
760 RecursiveMutexAutoLock
lock(mBufferMutex
);
762 if (!mBuffer
|| mBufferStartOffset
) {
763 return NS_ERROR_FAILURE
;
766 nsCOMPtr
<nsICloneableInputStream
> stream
= do_QueryInterface(mStream
);
767 NS_ENSURE_TRUE(stream
, NS_ERROR_FAILURE
);
769 nsCOMPtr
<nsIInputStream
> clonedStream
;
770 nsresult rv
= stream
->Clone(getter_AddRefs(clonedStream
));
771 NS_ENSURE_SUCCESS(rv
, rv
);
773 nsCOMPtr
<nsIBufferedInputStream
> bis
= new nsBufferedInputStream();
774 rv
= bis
->Init(clonedStream
, mBufferSize
);
775 NS_ENSURE_SUCCESS(rv
, rv
);
778 static_cast<nsBufferedInputStream
*>(bis
.get())->GetInputStream().take();
783 // nsIInputStreamLength
786 nsBufferedInputStream::Length(int64_t* aLength
) {
787 nsCOMPtr
<nsIInputStreamLength
> stream
= do_QueryInterface(mStream
);
788 NS_ENSURE_TRUE(stream
, NS_ERROR_FAILURE
);
790 return stream
->Length(aLength
);
793 // nsIAsyncInputStreamLength
796 nsBufferedInputStream::AsyncLengthWait(nsIInputStreamLengthCallback
* aCallback
,
797 nsIEventTarget
* aEventTarget
) {
798 nsCOMPtr
<nsIAsyncInputStreamLength
> stream
= do_QueryInterface(mStream
);
800 // Stream is probably closed. Callback, if not nullptr, can be executed
803 const RefPtr
<nsBufferedInputStream
> self
= this;
804 const nsCOMPtr
<nsIInputStreamLengthCallback
> callback
= aCallback
;
805 nsCOMPtr
<nsIRunnable
> runnable
= NS_NewRunnableFunction(
806 "nsBufferedInputStream::OnInputStreamLengthReady",
807 [self
, callback
] { callback
->OnInputStreamLengthReady(self
, -1); });
810 aEventTarget
->Dispatch(runnable
, NS_DISPATCH_NORMAL
);
818 nsCOMPtr
<nsIInputStreamLengthCallback
> callback
= aCallback
? this : nullptr;
820 MutexAutoLock
lock(mMutex
);
821 mAsyncInputStreamLengthCallback
= aCallback
;
825 return stream
->AsyncLengthWait(callback
, aEventTarget
);
828 // nsIInputStreamLengthCallback
831 nsBufferedInputStream::OnInputStreamLengthReady(
832 nsIAsyncInputStreamLength
* aStream
, int64_t aLength
) {
833 nsCOMPtr
<nsIInputStreamLengthCallback
> callback
;
835 MutexAutoLock
lock(mMutex
);
836 // We have been canceled in the meanwhile.
837 if (!mAsyncInputStreamLengthCallback
) {
841 callback
.swap(mAsyncInputStreamLengthCallback
);
844 MOZ_ASSERT(callback
);
845 return callback
->OnInputStreamLengthReady(this, aLength
);
848 ////////////////////////////////////////////////////////////////////////////////
849 // nsBufferedOutputStream
851 NS_IMPL_ADDREF_INHERITED(nsBufferedOutputStream
, nsBufferedStream
)
852 NS_IMPL_RELEASE_INHERITED(nsBufferedOutputStream
, nsBufferedStream
)
853 // This QI uses NS_INTERFACE_MAP_ENTRY_CONDITIONAL to check for
854 // non-nullness of mSafeStream.
855 NS_INTERFACE_MAP_BEGIN(nsBufferedOutputStream
)
856 NS_INTERFACE_MAP_ENTRY(nsIOutputStream
)
857 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISafeOutputStream
, mSafeStream
)
858 NS_INTERFACE_MAP_ENTRY(nsIBufferedOutputStream
)
859 NS_INTERFACE_MAP_ENTRY(nsIStreamBufferAccess
)
860 NS_INTERFACE_MAP_END_INHERITING(nsBufferedStream
)
862 nsresult
nsBufferedOutputStream::Create(REFNSIID aIID
, void** aResult
) {
863 RefPtr
<nsBufferedOutputStream
> stream
= new nsBufferedOutputStream();
864 return stream
->QueryInterface(aIID
, aResult
);
868 nsBufferedOutputStream::Init(nsIOutputStream
* stream
, uint32_t bufferSize
) {
869 // QI stream to an nsISafeOutputStream, to see if we should support it
870 mSafeStream
= do_QueryInterface(stream
);
872 return nsBufferedStream::Init(stream
, bufferSize
);
876 nsBufferedOutputStream::Close() {
881 nsresult rv1
, rv2
= NS_OK
;
886 if (NS_FAILED(rv1
)) {
888 "(debug) Flush() inside nsBufferedOutputStream::Close() returned error "
893 // If we fail to Flush all the data, then we close anyway and drop the
894 // remaining data in the buffer. We do this because it's what Unix does
895 // for fclose and close. However, we report the error from Flush anyway.
897 rv2
= Sink()->Close();
899 if (NS_FAILED(rv2
)) {
901 "(debug) Sink->Close() inside nsBufferedOutputStream::Close() "
902 "returned error (rv2).");
906 nsBufferedStream::Close();
908 if (NS_FAILED(rv1
)) {
911 if (NS_FAILED(rv2
)) {
918 nsBufferedOutputStream::StreamStatus() {
919 return mStream
? Sink()->StreamStatus() : NS_BASE_STREAM_CLOSED
;
923 nsBufferedOutputStream::Write(const char* buf
, uint32_t count
,
926 uint32_t written
= 0;
929 // We special case this situation.
930 // We should catch the failure, NS_BASE_STREAM_CLOSED ASAP, here.
931 // If we don't, eventually Flush() is called in the while loop below
932 // after so many writes.
933 // However, Flush() returns NS_OK when mStream is null (!!),
934 // and we don't get a meaningful error, NS_BASE_STREAM_CLOSED,
935 // soon enough when we use buffered output.
938 "(info) nsBufferedOutputStream::Write returns NS_BASE_STREAM_CLOSED "
939 "immediately (mStream==null).");
941 return NS_BASE_STREAM_CLOSED
;
944 RecursiveMutexAutoLock
lock(mBufferMutex
);
946 uint32_t amt
= std::min(count
, mBufferSize
- mCursor
);
948 memcpy(mBuffer
+ mCursor
, buf
+ written
, amt
);
952 if (mFillPoint
< mCursor
) mFillPoint
= mCursor
;
954 NS_ASSERTION(mFillPoint
, "loop in nsBufferedOutputStream::Write!");
959 "(debug) Flush() returned error in nsBufferedOutputStream::Write.");
966 return (written
> 0) ? NS_OK
: rv
;
970 nsBufferedOutputStream::Flush() {
974 // Stream already cancelled/flushed; probably because of previous error.
977 // optimize : some code within C-C needs to call Seek -> Flush() often.
978 if (mFillPoint
== 0) {
981 RecursiveMutexAutoLock
lock(mBufferMutex
);
982 rv
= Sink()->Write(mBuffer
, mFillPoint
, &amt
);
986 mBufferStartOffset
+= amt
;
987 if (amt
== mFillPoint
) {
988 mFillPoint
= mCursor
= 0;
989 return NS_OK
; // flushed everything
992 // slide the remainder down to the start of the buffer
993 // |<-------------->|<---|----->|
995 uint32_t rem
= mFillPoint
- amt
;
996 memmove(mBuffer
, mBuffer
+ amt
, rem
);
997 mFillPoint
= mCursor
= rem
;
998 return NS_ERROR_FAILURE
; // didn't flush all
1001 // nsISafeOutputStream
1003 nsBufferedOutputStream::Finish() {
1004 // flush the stream, to write out any buffered data...
1005 nsresult rv1
= nsBufferedOutputStream::Flush();
1006 nsresult rv2
= NS_OK
;
1008 if (NS_FAILED(rv1
)) {
1010 "(debug) nsBufferedOutputStream::Flush() failed in "
1011 "nsBufferedOutputStream::Finish()! Possible dataloss.");
1013 rv2
= Sink()->Close();
1014 if (NS_FAILED(rv2
)) {
1016 "(debug) Sink()->Close() failed in nsBufferedOutputStream::Finish()! "
1017 "Possible dataloss.");
1020 rv2
= mSafeStream
->Finish();
1021 if (NS_FAILED(rv2
)) {
1023 "(debug) mSafeStream->Finish() failed within "
1024 "nsBufferedOutputStream::Flush()! Possible dataloss.");
1028 // ... and close the buffered stream, so any further attempts to flush/close
1029 // the buffered stream won't cause errors.
1030 nsBufferedStream::Close();
1032 // We want to return the errors precisely from Finish()
1033 // and mimick the existing error handling in
1034 // nsBufferedOutputStream::Close() as reference.
1036 if (NS_FAILED(rv1
)) {
1039 if (NS_FAILED(rv2
)) {
1046 nsBufferedOutputStream::WriteFrom(nsIInputStream
* inStr
, uint32_t count
,
1047 uint32_t* _retval
) {
1048 return WriteSegments(NS_CopyStreamToSegment
, inStr
, count
, _retval
);
1052 nsBufferedOutputStream::WriteSegments(nsReadSegmentFun reader
, void* closure
,
1053 uint32_t count
, uint32_t* _retval
) {
1056 RecursiveMutexAutoLock
lock(mBufferMutex
);
1058 uint32_t left
= std::min(count
, mBufferSize
- mCursor
);
1061 if (NS_FAILED(rv
)) {
1062 return (*_retval
> 0) ? NS_OK
: rv
;
1069 rv
= reader(this, closure
, mBuffer
+ mCursor
, *_retval
, left
, &read
);
1071 if (NS_FAILED(rv
)) { // If we have read some data, return ok
1072 return (*_retval
> 0) ? NS_OK
: rv
;
1077 mFillPoint
= std::max(mFillPoint
, mCursor
);
1083 nsBufferedOutputStream::IsNonBlocking(bool* aNonBlocking
) {
1085 return Sink()->IsNonBlocking(aNonBlocking
);
1087 return NS_ERROR_NOT_INITIALIZED
;
1090 NS_IMETHODIMP_(char*)
1091 nsBufferedOutputStream::GetBuffer(uint32_t aLength
, uint32_t aAlignMask
) {
1092 NS_ASSERTION(mGetBufferCount
== 0, "nested GetBuffer!");
1093 if (mGetBufferCount
!= 0) {
1097 if (mBufferDisabled
) {
1101 RecursiveMutexAutoLock
lock(mBufferMutex
);
1102 char* buf
= mBuffer
+ mCursor
;
1103 uint32_t rem
= mBufferSize
- mCursor
;
1105 if (NS_FAILED(Flush())) {
1108 buf
= mBuffer
+ mCursor
;
1109 rem
= mBufferSize
- mCursor
;
1112 uint32_t mod
= (NS_PTR_TO_INT32(buf
) & aAlignMask
);
1114 uint32_t pad
= aAlignMask
+ 1 - mod
;
1119 memset(buf
, 0, pad
);
1125 if (aLength
> rem
) {
1132 NS_IMETHODIMP_(void)
1133 nsBufferedOutputStream::PutBuffer(char* aBuffer
, uint32_t aLength
) {
1134 NS_ASSERTION(mGetBufferCount
== 1, "stray PutBuffer!");
1135 if (--mGetBufferCount
!= 0) {
1139 NS_ASSERTION(mCursor
+ aLength
<= mBufferSize
, "PutBuffer botch");
1141 if (mFillPoint
< mCursor
) {
1142 mFillPoint
= mCursor
;
1147 nsBufferedOutputStream::DisableBuffering() {
1148 NS_ASSERTION(!mBufferDisabled
, "redundant call to DisableBuffering!");
1149 NS_ASSERTION(mGetBufferCount
== 0,
1150 "DisableBuffer call between GetBuffer and PutBuffer!");
1151 if (mGetBufferCount
!= 0) {
1152 return NS_ERROR_UNEXPECTED
;
1155 // Empty the buffer so nsBufferedStream::Tell works.
1156 nsresult rv
= Flush();
1157 if (NS_FAILED(rv
)) {
1161 mBufferDisabled
= true;
1166 nsBufferedOutputStream::EnableBuffering() {
1167 NS_ASSERTION(mBufferDisabled
, "gratuitous call to EnableBuffering!");
1168 mBufferDisabled
= false;
1173 nsBufferedOutputStream::GetUnbufferedStream(nsISupports
** aStream
) {
1174 // Empty the buffer so subsequent i/o trumps any buffered data.
1176 nsresult rv
= Flush();
1177 if (NS_FAILED(rv
)) {
1182 nsCOMPtr
<nsISupports
> stream
= mStream
;
1183 stream
.forget(aStream
);
1188 nsBufferedOutputStream::GetData(nsIOutputStream
** aResult
) {
1189 nsCOMPtr
<nsISupports
> stream
;
1190 nsBufferedStream::GetData(getter_AddRefs(stream
));
1191 nsCOMPtr
<nsIOutputStream
> outputStream
= do_QueryInterface(stream
);
1192 outputStream
.forget(aResult
);
1197 ////////////////////////////////////////////////////////////////////////////////