Merge mozilla-central to autoland. CLOSED TREE
[gecko.git] / netwerk / base / nsBufferedStreams.cpp
blob2606352cc48247b4790890906d90c71ac122278d
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 #include "nsBufferedStreams.h"
7 #include "nsStreamUtils.h"
8 #include "nsNetCID.h"
9 #include "nsIClassInfoImpl.h"
10 #include "nsIEventTarget.h"
11 #include "nsThreadUtils.h"
12 #include "mozilla/DebugOnly.h"
13 #include "mozilla/ipc/InputStreamUtils.h"
14 #include <algorithm>
16 #ifdef DEBUG_brendan
17 # define METERING
18 #endif
20 #ifdef METERING
21 # include <stdio.h>
22 # define METER(x) x
23 # define MAX_BIG_SEEKS 20
25 static struct {
26 uint32_t mSeeksWithinBuffer;
27 uint32_t mSeeksOutsideBuffer;
28 uint32_t mBufferReadUponSeek;
29 uint32_t mBufferUnreadUponSeek;
30 uint32_t mBytesReadFromBuffer;
31 uint32_t mBigSeekIndex;
32 struct {
33 int64_t mOldOffset;
34 int64_t mNewOffset;
35 } mBigSeek[MAX_BIG_SEEKS];
36 } bufstats;
37 #else
38 # define METER(x) /* nothing */
39 #endif
41 using namespace mozilla::ipc;
42 using namespace mozilla;
44 ////////////////////////////////////////////////////////////////////////////////
45 // nsBufferedStream
47 nsBufferedStream::~nsBufferedStream() { Close(); }
49 NS_IMPL_ADDREF(nsBufferedStream)
50 NS_IMPL_RELEASE(nsBufferedStream)
52 NS_INTERFACE_MAP_BEGIN(nsBufferedStream)
53 NS_INTERFACE_MAP_ENTRY(nsISupports)
54 NS_INTERFACE_MAP_ENTRY(nsITellableStream)
55 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream, mSeekable)
56 NS_INTERFACE_MAP_END
58 nsresult nsBufferedStream::Init(nsISupports* aStream, uint32_t bufferSize) {
59 NS_ASSERTION(aStream, "need to supply a stream");
60 NS_ASSERTION(mStream == nullptr, "already inited");
61 mStream = aStream; // we keep a reference until nsBufferedStream::Close
62 mBufferSize = bufferSize;
63 mBufferStartOffset = 0;
64 mCursor = 0;
65 nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mStream);
66 mSeekable = seekable;
67 RecursiveMutexAutoLock lock(mBufferMutex);
68 mBuffer = new (mozilla::fallible) char[bufferSize];
69 if (mBuffer == nullptr) {
70 return NS_ERROR_OUT_OF_MEMORY;
72 return NS_OK;
75 void nsBufferedStream::Close() {
76 // Drop the reference from nsBufferedStream::Init()
77 mStream = nullptr;
78 RecursiveMutexAutoLock lock(mBufferMutex);
79 if (mBuffer) {
80 delete[] mBuffer;
81 mBuffer = nullptr;
82 mBufferSize = 0;
83 mBufferStartOffset = 0;
84 mCursor = 0;
85 mFillPoint = 0;
87 #ifdef METERING
89 static FILE* tfp;
90 if (!tfp) {
91 tfp = fopen("/tmp/bufstats", "w");
92 if (tfp) {
93 setvbuf(tfp, nullptr, _IOLBF, 0);
96 if (tfp) {
97 fprintf(tfp, "seeks within buffer: %u\n", bufstats.mSeeksWithinBuffer);
98 fprintf(tfp, "seeks outside buffer: %u\n",
99 bufstats.mSeeksOutsideBuffer);
100 fprintf(tfp, "buffer read on seek: %u\n",
101 bufstats.mBufferReadUponSeek);
102 fprintf(tfp, "buffer unread on seek: %u\n",
103 bufstats.mBufferUnreadUponSeek);
104 fprintf(tfp, "bytes read from buffer: %u\n",
105 bufstats.mBytesReadFromBuffer);
106 for (uint32_t i = 0; i < bufstats.mBigSeekIndex; i++) {
107 fprintf(tfp, "bigseek[%u] = {old: %u, new: %u}\n", i,
108 bufstats.mBigSeek[i].mOldOffset,
109 bufstats.mBigSeek[i].mNewOffset);
113 #endif
116 NS_IMETHODIMP
117 nsBufferedStream::Seek(int32_t whence, int64_t offset) {
118 if (mStream == nullptr) {
119 return NS_BASE_STREAM_CLOSED;
122 // If the underlying stream isn't a random access store, then fail early.
123 // We could possibly succeed for the case where the seek position denotes
124 // something that happens to be read into the buffer, but that would make
125 // the failure data-dependent.
126 nsresult rv;
127 nsCOMPtr<nsISeekableStream> ras = do_QueryInterface(mStream, &rv);
128 if (NS_FAILED(rv)) {
129 NS_WARNING("mStream doesn't QI to nsISeekableStream");
130 return rv;
133 int64_t absPos = 0;
134 switch (whence) {
135 case nsISeekableStream::NS_SEEK_SET:
136 absPos = offset;
137 break;
138 case nsISeekableStream::NS_SEEK_CUR:
139 absPos = mBufferStartOffset;
140 absPos += mCursor;
141 absPos += offset;
142 break;
143 case nsISeekableStream::NS_SEEK_END:
144 absPos = -1;
145 break;
146 default:
147 MOZ_ASSERT_UNREACHABLE("bogus seek whence parameter");
148 return NS_ERROR_UNEXPECTED;
151 // Let mCursor point into the existing buffer if the new position is
152 // between the current cursor and the mFillPoint "fencepost" -- the
153 // client may never get around to a Read or Write after this Seek.
154 // Read and Write worry about flushing and filling in that event.
155 // But if we're at EOF, make sure to pass the seek through to the
156 // underlying stream, because it may have auto-closed itself and
157 // needs to reopen.
158 uint32_t offsetInBuffer = uint32_t(absPos - mBufferStartOffset);
159 if (offsetInBuffer <= mFillPoint && !mEOF) {
160 METER(bufstats.mSeeksWithinBuffer++);
161 mCursor = offsetInBuffer;
162 return NS_OK;
165 METER(bufstats.mSeeksOutsideBuffer++);
166 METER(bufstats.mBufferReadUponSeek += mCursor);
167 METER(bufstats.mBufferUnreadUponSeek += mFillPoint - mCursor);
168 rv = Flush();
169 if (NS_FAILED(rv)) {
170 #ifdef DEBUG
171 NS_WARNING(
172 "(debug) Flush returned error within nsBufferedStream::Seek, so we "
173 "exit early.");
174 #endif
175 return rv;
178 rv = ras->Seek(whence, offset);
179 if (NS_FAILED(rv)) {
180 #ifdef DEBUG
181 NS_WARNING(
182 "(debug) Error: ras->Seek() returned error within "
183 "nsBufferedStream::Seek, so we exit early.");
184 #endif
185 return rv;
188 mEOF = false;
190 // Recompute whether the offset we're seeking to is in our buffer.
191 // Note that we need to recompute because Flush() might have
192 // changed mBufferStartOffset.
193 offsetInBuffer = uint32_t(absPos - mBufferStartOffset);
194 if (offsetInBuffer <= mFillPoint) {
195 // It's safe to just set mCursor to offsetInBuffer. In particular, we
196 // want to avoid calling Fill() here since we already have the data that
197 // was seeked to and calling Fill() might auto-close our underlying
198 // stream in some cases.
199 mCursor = offsetInBuffer;
200 return NS_OK;
203 METER(if (bufstats.mBigSeekIndex < MAX_BIG_SEEKS)
204 bufstats.mBigSeek[bufstats.mBigSeekIndex]
205 .mOldOffset = mBufferStartOffset + int64_t(mCursor));
206 const int64_t minus1 = -1;
207 if (absPos == minus1) {
208 // then we had the SEEK_END case, above
209 int64_t tellPos;
210 rv = ras->Tell(&tellPos);
211 mBufferStartOffset = tellPos;
212 if (NS_FAILED(rv)) {
213 return rv;
215 } else {
216 mBufferStartOffset = absPos;
218 METER(if (bufstats.mBigSeekIndex < MAX_BIG_SEEKS)
219 bufstats.mBigSeek[bufstats.mBigSeekIndex++]
220 .mNewOffset = mBufferStartOffset);
222 mFillPoint = mCursor = 0;
224 // If we seeked back to the start, then don't fill the buffer
225 // right now in case this is a lazily-opened file stream.
226 // We'll fill on the first read, like we did initially.
227 if (whence == nsISeekableStream::NS_SEEK_SET && offset == 0) {
228 return NS_OK;
230 return Fill();
233 NS_IMETHODIMP
234 nsBufferedStream::Tell(int64_t* result) {
235 if (mStream == nullptr) {
236 return NS_BASE_STREAM_CLOSED;
239 int64_t result64 = mBufferStartOffset;
240 result64 += mCursor;
241 *result = result64;
242 return NS_OK;
245 NS_IMETHODIMP
246 nsBufferedStream::SetEOF() {
247 if (mStream == nullptr) {
248 return NS_BASE_STREAM_CLOSED;
251 nsresult rv;
252 nsCOMPtr<nsISeekableStream> ras = do_QueryInterface(mStream, &rv);
253 if (NS_FAILED(rv)) {
254 return rv;
257 rv = ras->SetEOF();
258 if (NS_SUCCEEDED(rv)) {
259 mEOF = true;
262 return rv;
265 nsresult nsBufferedStream::GetData(nsISupports** aResult) {
266 nsCOMPtr<nsISupports> stream(mStream);
267 stream.forget(aResult);
268 return NS_OK;
271 ////////////////////////////////////////////////////////////////////////////////
272 // nsBufferedInputStream
274 NS_IMPL_ADDREF_INHERITED(nsBufferedInputStream, nsBufferedStream)
275 NS_IMPL_RELEASE_INHERITED(nsBufferedInputStream, nsBufferedStream)
277 NS_IMPL_CLASSINFO(nsBufferedInputStream, nullptr, nsIClassInfo::THREADSAFE,
278 NS_BUFFEREDINPUTSTREAM_CID)
280 NS_INTERFACE_MAP_BEGIN(nsBufferedInputStream)
281 // Unfortunately there isn't a macro that combines ambiguous and conditional,
282 // and as far as I can tell, no other class would need such a macro.
283 if (mIsAsyncInputStream && aIID.Equals(NS_GET_IID(nsIInputStream))) {
284 foundInterface =
285 static_cast<nsIInputStream*>(static_cast<nsIAsyncInputStream*>(this));
286 } else if (!mIsAsyncInputStream && aIID.Equals(NS_GET_IID(nsIInputStream))) {
287 foundInterface = static_cast<nsIInputStream*>(
288 static_cast<nsIBufferedInputStream*>(this));
289 } else
290 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIBufferedInputStream)
291 NS_INTERFACE_MAP_ENTRY(nsIBufferedInputStream)
292 NS_INTERFACE_MAP_ENTRY(nsIStreamBufferAccess)
293 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream,
294 mIsIPCSerializable)
295 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream, mIsAsyncInputStream)
296 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback,
297 mIsAsyncInputStream)
298 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream,
299 mIsCloneableInputStream)
300 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamLength, mIsInputStreamLength)
301 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStreamLength,
302 mIsAsyncInputStreamLength)
303 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamLengthCallback,
304 mIsAsyncInputStreamLength)
305 NS_IMPL_QUERY_CLASSINFO(nsBufferedInputStream)
306 NS_INTERFACE_MAP_END_INHERITING(nsBufferedStream)
308 NS_IMPL_CI_INTERFACE_GETTER(nsBufferedInputStream, nsIInputStream,
309 nsIBufferedInputStream, nsISeekableStream,
310 nsITellableStream, nsIStreamBufferAccess)
312 nsresult nsBufferedInputStream::Create(REFNSIID aIID, void** aResult) {
313 RefPtr<nsBufferedInputStream> stream = new nsBufferedInputStream();
314 return stream->QueryInterface(aIID, aResult);
317 NS_IMETHODIMP
318 nsBufferedInputStream::Init(nsIInputStream* stream, uint32_t bufferSize) {
319 nsresult rv = nsBufferedStream::Init(stream, bufferSize);
320 NS_ENSURE_SUCCESS(rv, rv);
323 nsCOMPtr<nsIIPCSerializableInputStream> stream = do_QueryInterface(mStream);
324 mIsIPCSerializable = !!stream;
328 nsCOMPtr<nsIAsyncInputStream> stream = do_QueryInterface(mStream);
329 mIsAsyncInputStream = !!stream;
333 nsCOMPtr<nsICloneableInputStream> stream = do_QueryInterface(mStream);
334 mIsCloneableInputStream = !!stream;
338 nsCOMPtr<nsIInputStreamLength> stream = do_QueryInterface(mStream);
339 mIsInputStreamLength = !!stream;
343 nsCOMPtr<nsIAsyncInputStreamLength> stream = do_QueryInterface(mStream);
344 mIsAsyncInputStreamLength = !!stream;
347 return NS_OK;
350 already_AddRefed<nsIInputStream> nsBufferedInputStream::GetInputStream() {
351 // A non-null mStream implies Init() has been called.
352 MOZ_ASSERT(mStream);
354 nsIInputStream* out = nullptr;
355 DebugOnly<nsresult> rv = QueryInterface(NS_GET_IID(nsIInputStream),
356 reinterpret_cast<void**>(&out));
357 MOZ_ASSERT(NS_SUCCEEDED(rv));
358 MOZ_ASSERT(out);
360 return already_AddRefed<nsIInputStream>(out);
363 NS_IMETHODIMP
364 nsBufferedInputStream::Close() {
365 nsresult rv = NS_OK;
366 if (mStream) {
367 rv = Source()->Close();
368 if (NS_FAILED(rv)) {
369 NS_WARNING(
370 "(debug) Error: Source()->Close() returned error in "
371 "bsBuffedInputStream::Close().");
375 nsBufferedStream::Close();
376 return rv;
379 NS_IMETHODIMP
380 nsBufferedInputStream::Available(uint64_t* result) {
381 *result = 0;
383 if (!mStream) {
384 return NS_OK;
387 uint64_t avail = mFillPoint - mCursor;
389 uint64_t tmp;
390 nsresult rv = Source()->Available(&tmp);
391 if (NS_SUCCEEDED(rv)) {
392 avail += tmp;
395 if (avail) {
396 *result = avail;
397 return NS_OK;
400 return rv;
403 NS_IMETHODIMP
404 nsBufferedInputStream::StreamStatus() {
405 if (!mStream) {
406 return NS_OK;
409 if (mFillPoint - mCursor) {
410 return NS_OK;
413 return Source()->StreamStatus();
416 NS_IMETHODIMP
417 nsBufferedInputStream::Read(char* buf, uint32_t count, uint32_t* result) {
418 if (mBufferDisabled) {
419 if (!mStream) {
420 *result = 0;
421 return NS_OK;
423 nsresult rv = Source()->Read(buf, count, result);
424 if (NS_SUCCEEDED(rv)) {
425 mBufferStartOffset += *result; // so nsBufferedStream::Tell works
426 if (*result == 0) {
427 mEOF = true;
430 return rv;
433 return ReadSegments(NS_CopySegmentToBuffer, buf, count, result);
436 NS_IMETHODIMP
437 nsBufferedInputStream::ReadSegments(nsWriteSegmentFun writer, void* closure,
438 uint32_t count, uint32_t* result) {
439 *result = 0;
441 if (!mStream) {
442 return NS_OK;
445 nsresult rv = NS_OK;
446 RecursiveMutexAutoLock lock(mBufferMutex);
447 while (count > 0) {
448 uint32_t amt = std::min(count, mFillPoint - mCursor);
449 if (amt > 0) {
450 uint32_t read = 0;
451 rv = writer(static_cast<nsIBufferedInputStream*>(this), closure,
452 mBuffer + mCursor, *result, amt, &read);
453 if (NS_FAILED(rv)) {
454 // errors returned from the writer end here!
455 rv = NS_OK;
456 break;
458 *result += read;
459 count -= read;
460 mCursor += read;
461 } else {
462 rv = Fill();
463 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
464 break;
466 if (NS_FAILED(rv)) {
467 return rv;
469 if (mFillPoint == mCursor) {
470 break;
474 return (*result > 0) ? NS_OK : rv;
477 NS_IMETHODIMP
478 nsBufferedInputStream::IsNonBlocking(bool* aNonBlocking) {
479 if (mStream) {
480 return Source()->IsNonBlocking(aNonBlocking);
482 return NS_ERROR_NOT_INITIALIZED;
485 NS_IMETHODIMP
486 nsBufferedInputStream::Fill() {
487 if (mBufferDisabled) {
488 return NS_OK;
490 NS_ENSURE_TRUE(mStream, NS_ERROR_NOT_INITIALIZED);
492 RecursiveMutexAutoLock lock(mBufferMutex);
494 nsresult rv;
495 int32_t rem = int32_t(mFillPoint - mCursor);
496 if (rem > 0) {
497 // slide the remainder down to the start of the buffer
498 // |<------------->|<--rem-->|<--->|
499 // b c f s
500 memcpy(mBuffer, mBuffer + mCursor, rem);
502 mBufferStartOffset += mCursor;
503 mFillPoint = rem;
504 mCursor = 0;
506 uint32_t amt;
507 rv = Source()->Read(mBuffer + mFillPoint, mBufferSize - mFillPoint, &amt);
508 if (NS_FAILED(rv)) {
509 return rv;
512 if (amt == 0) {
513 mEOF = true;
516 mFillPoint += amt;
517 return NS_OK;
520 NS_IMETHODIMP_(char*)
521 nsBufferedInputStream::GetBuffer(uint32_t aLength, uint32_t aAlignMask) {
522 NS_ASSERTION(mGetBufferCount == 0, "nested GetBuffer!");
523 if (mGetBufferCount != 0) {
524 return nullptr;
527 if (mBufferDisabled) {
528 return nullptr;
531 RecursiveMutexAutoLock lock(mBufferMutex);
532 char* buf = mBuffer + mCursor;
533 uint32_t rem = mFillPoint - mCursor;
534 if (rem == 0) {
535 if (NS_FAILED(Fill())) {
536 return nullptr;
538 buf = mBuffer + mCursor;
539 rem = mFillPoint - mCursor;
542 uint32_t mod = (NS_PTR_TO_INT32(buf) & aAlignMask);
543 if (mod) {
544 uint32_t pad = aAlignMask + 1 - mod;
545 if (pad > rem) {
546 return nullptr;
549 memset(buf, 0, pad);
550 mCursor += pad;
551 buf += pad;
552 rem -= pad;
555 if (aLength > rem) {
556 return nullptr;
558 mGetBufferCount++;
559 return buf;
562 NS_IMETHODIMP_(void)
563 nsBufferedInputStream::PutBuffer(char* aBuffer, uint32_t aLength) {
564 NS_ASSERTION(mGetBufferCount == 1, "stray PutBuffer!");
565 if (--mGetBufferCount != 0) {
566 return;
569 NS_ASSERTION(mCursor + aLength <= mFillPoint, "PutBuffer botch");
570 mCursor += aLength;
573 NS_IMETHODIMP
574 nsBufferedInputStream::DisableBuffering() {
575 NS_ASSERTION(!mBufferDisabled, "redundant call to DisableBuffering!");
576 NS_ASSERTION(mGetBufferCount == 0,
577 "DisableBuffer call between GetBuffer and PutBuffer!");
578 if (mGetBufferCount != 0) {
579 return NS_ERROR_UNEXPECTED;
582 // Empty the buffer so nsBufferedStream::Tell works.
583 mBufferStartOffset += mCursor;
584 mFillPoint = mCursor = 0;
585 mBufferDisabled = true;
586 return NS_OK;
589 NS_IMETHODIMP
590 nsBufferedInputStream::EnableBuffering() {
591 NS_ASSERTION(mBufferDisabled, "gratuitous call to EnableBuffering!");
592 mBufferDisabled = false;
593 return NS_OK;
596 NS_IMETHODIMP
597 nsBufferedInputStream::GetUnbufferedStream(nsISupports** aStream) {
598 // Empty the buffer so subsequent i/o trumps any buffered data.
599 mBufferStartOffset += mCursor;
600 mFillPoint = mCursor = 0;
602 nsCOMPtr<nsISupports> stream = mStream;
603 stream.forget(aStream);
604 return NS_OK;
607 void nsBufferedInputStream::SerializedComplexity(uint32_t aMaxSize,
608 uint32_t* aSizeUsed,
609 uint32_t* aPipes,
610 uint32_t* aTransferables) {
611 if (mStream) {
612 nsCOMPtr<nsIInputStream> stream = do_QueryInterface(mStream);
613 MOZ_ASSERT(stream);
615 InputStreamHelper::SerializedComplexity(stream, aMaxSize, aSizeUsed, aPipes,
616 aTransferables);
620 void nsBufferedInputStream::Serialize(InputStreamParams& aParams,
621 uint32_t aMaxSize, uint32_t* aSizeUsed) {
622 MOZ_ASSERT(aSizeUsed);
623 *aSizeUsed = 0;
625 BufferedInputStreamParams params;
627 if (mStream) {
628 nsCOMPtr<nsIInputStream> stream = do_QueryInterface(mStream);
629 MOZ_ASSERT(stream);
631 InputStreamParams wrappedParams;
632 InputStreamHelper::SerializeInputStream(stream, wrappedParams, aMaxSize,
633 aSizeUsed);
635 params.optionalStream().emplace(wrappedParams);
638 params.bufferSize() = mBufferSize;
640 aParams = params;
643 bool nsBufferedInputStream::Deserialize(const InputStreamParams& aParams) {
644 if (aParams.type() != InputStreamParams::TBufferedInputStreamParams) {
645 NS_ERROR("Received unknown parameters from the other process!");
646 return false;
649 const BufferedInputStreamParams& params =
650 aParams.get_BufferedInputStreamParams();
651 const Maybe<InputStreamParams>& wrappedParams = params.optionalStream();
653 nsCOMPtr<nsIInputStream> stream;
654 if (wrappedParams.isSome()) {
655 stream = InputStreamHelper::DeserializeInputStream(wrappedParams.ref());
656 if (!stream) {
657 NS_WARNING("Failed to deserialize wrapped stream!");
658 return false;
662 nsresult rv = Init(stream, params.bufferSize());
663 NS_ENSURE_SUCCESS(rv, false);
665 return true;
668 NS_IMETHODIMP
669 nsBufferedInputStream::CloseWithStatus(nsresult aStatus) { return Close(); }
671 NS_IMETHODIMP
672 nsBufferedInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
673 uint32_t aFlags, uint32_t aRequestedCount,
674 nsIEventTarget* aEventTarget) {
675 nsCOMPtr<nsIAsyncInputStream> stream = do_QueryInterface(mStream);
676 if (!stream) {
677 // Stream is probably closed. Callback, if not nullptr, can be executed
678 // immediately
679 if (!aCallback) {
680 return NS_OK;
683 if (aEventTarget) {
684 nsCOMPtr<nsIInputStreamCallback> callable = NS_NewInputStreamReadyEvent(
685 "nsBufferedInputStream::OnInputStreamReady", aCallback, aEventTarget);
686 return callable->OnInputStreamReady(this);
689 aCallback->OnInputStreamReady(this);
690 return NS_OK;
693 nsCOMPtr<nsIInputStreamCallback> callback = aCallback ? this : nullptr;
695 MutexAutoLock lock(mMutex);
697 if (NS_WARN_IF(mAsyncWaitCallback && aCallback &&
698 mAsyncWaitCallback != aCallback)) {
699 return NS_ERROR_FAILURE;
702 mAsyncWaitCallback = aCallback;
705 return stream->AsyncWait(callback, aFlags, aRequestedCount, aEventTarget);
708 NS_IMETHODIMP
709 nsBufferedInputStream::OnInputStreamReady(nsIAsyncInputStream* aStream) {
710 nsCOMPtr<nsIInputStreamCallback> callback;
712 MutexAutoLock lock(mMutex);
714 // We have been canceled in the meanwhile.
715 if (!mAsyncWaitCallback) {
716 return NS_OK;
719 callback.swap(mAsyncWaitCallback);
722 MOZ_ASSERT(callback);
723 return callback->OnInputStreamReady(this);
726 NS_IMETHODIMP
727 nsBufferedInputStream::GetData(nsIInputStream** aResult) {
728 nsCOMPtr<nsISupports> stream;
729 nsBufferedStream::GetData(getter_AddRefs(stream));
730 nsCOMPtr<nsIInputStream> inputStream = do_QueryInterface(stream);
731 inputStream.forget(aResult);
732 return NS_OK;
735 // nsICloneableInputStream interface
737 NS_IMETHODIMP
738 nsBufferedInputStream::GetCloneable(bool* aCloneable) {
739 *aCloneable = false;
741 RecursiveMutexAutoLock lock(mBufferMutex);
743 // If we don't have the buffer, the inputStream has been already closed.
744 // If mBufferStartOffset is not 0, the stream has been seeked or read.
745 // In both case the cloning is not supported.
746 if (!mBuffer || mBufferStartOffset) {
747 return NS_OK;
750 nsCOMPtr<nsICloneableInputStream> stream = do_QueryInterface(mStream);
752 // GetCloneable is infallible.
753 NS_ENSURE_TRUE(stream, NS_OK);
755 return stream->GetCloneable(aCloneable);
758 NS_IMETHODIMP
759 nsBufferedInputStream::Clone(nsIInputStream** aResult) {
760 RecursiveMutexAutoLock lock(mBufferMutex);
762 if (!mBuffer || mBufferStartOffset) {
763 return NS_ERROR_FAILURE;
766 nsCOMPtr<nsICloneableInputStream> stream = do_QueryInterface(mStream);
767 NS_ENSURE_TRUE(stream, NS_ERROR_FAILURE);
769 nsCOMPtr<nsIInputStream> clonedStream;
770 nsresult rv = stream->Clone(getter_AddRefs(clonedStream));
771 NS_ENSURE_SUCCESS(rv, rv);
773 nsCOMPtr<nsIBufferedInputStream> bis = new nsBufferedInputStream();
774 rv = bis->Init(clonedStream, mBufferSize);
775 NS_ENSURE_SUCCESS(rv, rv);
777 *aResult =
778 static_cast<nsBufferedInputStream*>(bis.get())->GetInputStream().take();
780 return NS_OK;
783 // nsIInputStreamLength
785 NS_IMETHODIMP
786 nsBufferedInputStream::Length(int64_t* aLength) {
787 nsCOMPtr<nsIInputStreamLength> stream = do_QueryInterface(mStream);
788 NS_ENSURE_TRUE(stream, NS_ERROR_FAILURE);
790 return stream->Length(aLength);
793 // nsIAsyncInputStreamLength
795 NS_IMETHODIMP
796 nsBufferedInputStream::AsyncLengthWait(nsIInputStreamLengthCallback* aCallback,
797 nsIEventTarget* aEventTarget) {
798 nsCOMPtr<nsIAsyncInputStreamLength> stream = do_QueryInterface(mStream);
799 if (!stream) {
800 // Stream is probably closed. Callback, if not nullptr, can be executed
801 // immediately
802 if (aCallback) {
803 const RefPtr<nsBufferedInputStream> self = this;
804 const nsCOMPtr<nsIInputStreamLengthCallback> callback = aCallback;
805 nsCOMPtr<nsIRunnable> runnable = NS_NewRunnableFunction(
806 "nsBufferedInputStream::OnInputStreamLengthReady",
807 [self, callback] { callback->OnInputStreamLengthReady(self, -1); });
809 if (aEventTarget) {
810 aEventTarget->Dispatch(runnable, NS_DISPATCH_NORMAL);
811 } else {
812 runnable->Run();
815 return NS_OK;
818 nsCOMPtr<nsIInputStreamLengthCallback> callback = aCallback ? this : nullptr;
820 MutexAutoLock lock(mMutex);
821 mAsyncInputStreamLengthCallback = aCallback;
824 MOZ_ASSERT(stream);
825 return stream->AsyncLengthWait(callback, aEventTarget);
828 // nsIInputStreamLengthCallback
830 NS_IMETHODIMP
831 nsBufferedInputStream::OnInputStreamLengthReady(
832 nsIAsyncInputStreamLength* aStream, int64_t aLength) {
833 nsCOMPtr<nsIInputStreamLengthCallback> callback;
835 MutexAutoLock lock(mMutex);
836 // We have been canceled in the meanwhile.
837 if (!mAsyncInputStreamLengthCallback) {
838 return NS_OK;
841 callback.swap(mAsyncInputStreamLengthCallback);
844 MOZ_ASSERT(callback);
845 return callback->OnInputStreamLengthReady(this, aLength);
848 ////////////////////////////////////////////////////////////////////////////////
849 // nsBufferedOutputStream
851 NS_IMPL_ADDREF_INHERITED(nsBufferedOutputStream, nsBufferedStream)
852 NS_IMPL_RELEASE_INHERITED(nsBufferedOutputStream, nsBufferedStream)
853 // This QI uses NS_INTERFACE_MAP_ENTRY_CONDITIONAL to check for
854 // non-nullness of mSafeStream.
855 NS_INTERFACE_MAP_BEGIN(nsBufferedOutputStream)
856 NS_INTERFACE_MAP_ENTRY(nsIOutputStream)
857 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISafeOutputStream, mSafeStream)
858 NS_INTERFACE_MAP_ENTRY(nsIBufferedOutputStream)
859 NS_INTERFACE_MAP_ENTRY(nsIStreamBufferAccess)
860 NS_INTERFACE_MAP_END_INHERITING(nsBufferedStream)
862 nsresult nsBufferedOutputStream::Create(REFNSIID aIID, void** aResult) {
863 RefPtr<nsBufferedOutputStream> stream = new nsBufferedOutputStream();
864 return stream->QueryInterface(aIID, aResult);
867 NS_IMETHODIMP
868 nsBufferedOutputStream::Init(nsIOutputStream* stream, uint32_t bufferSize) {
869 // QI stream to an nsISafeOutputStream, to see if we should support it
870 mSafeStream = do_QueryInterface(stream);
872 return nsBufferedStream::Init(stream, bufferSize);
875 NS_IMETHODIMP
876 nsBufferedOutputStream::Close() {
877 if (!mStream) {
878 return NS_OK;
881 nsresult rv1, rv2 = NS_OK;
883 rv1 = Flush();
885 #ifdef DEBUG
886 if (NS_FAILED(rv1)) {
887 NS_WARNING(
888 "(debug) Flush() inside nsBufferedOutputStream::Close() returned error "
889 "(rv1).");
891 #endif
893 // If we fail to Flush all the data, then we close anyway and drop the
894 // remaining data in the buffer. We do this because it's what Unix does
895 // for fclose and close. However, we report the error from Flush anyway.
896 if (mStream) {
897 rv2 = Sink()->Close();
898 #ifdef DEBUG
899 if (NS_FAILED(rv2)) {
900 NS_WARNING(
901 "(debug) Sink->Close() inside nsBufferedOutputStream::Close() "
902 "returned error (rv2).");
904 #endif
906 nsBufferedStream::Close();
908 if (NS_FAILED(rv1)) {
909 return rv1;
911 if (NS_FAILED(rv2)) {
912 return rv2;
914 return NS_OK;
917 NS_IMETHODIMP
918 nsBufferedOutputStream::StreamStatus() {
919 return mStream ? Sink()->StreamStatus() : NS_BASE_STREAM_CLOSED;
922 NS_IMETHODIMP
923 nsBufferedOutputStream::Write(const char* buf, uint32_t count,
924 uint32_t* result) {
925 nsresult rv = NS_OK;
926 uint32_t written = 0;
927 *result = 0;
928 if (!mStream) {
929 // We special case this situation.
930 // We should catch the failure, NS_BASE_STREAM_CLOSED ASAP, here.
931 // If we don't, eventually Flush() is called in the while loop below
932 // after so many writes.
933 // However, Flush() returns NS_OK when mStream is null (!!),
934 // and we don't get a meaningful error, NS_BASE_STREAM_CLOSED,
935 // soon enough when we use buffered output.
936 #ifdef DEBUG
937 NS_WARNING(
938 "(info) nsBufferedOutputStream::Write returns NS_BASE_STREAM_CLOSED "
939 "immediately (mStream==null).");
940 #endif
941 return NS_BASE_STREAM_CLOSED;
944 RecursiveMutexAutoLock lock(mBufferMutex);
945 while (count > 0) {
946 uint32_t amt = std::min(count, mBufferSize - mCursor);
947 if (amt > 0) {
948 memcpy(mBuffer + mCursor, buf + written, amt);
949 written += amt;
950 count -= amt;
951 mCursor += amt;
952 if (mFillPoint < mCursor) mFillPoint = mCursor;
953 } else {
954 NS_ASSERTION(mFillPoint, "loop in nsBufferedOutputStream::Write!");
955 rv = Flush();
956 if (NS_FAILED(rv)) {
957 #ifdef DEBUG
958 NS_WARNING(
959 "(debug) Flush() returned error in nsBufferedOutputStream::Write.");
960 #endif
961 break;
965 *result = written;
966 return (written > 0) ? NS_OK : rv;
969 NS_IMETHODIMP
970 nsBufferedOutputStream::Flush() {
971 nsresult rv;
972 uint32_t amt;
973 if (!mStream) {
974 // Stream already cancelled/flushed; probably because of previous error.
975 return NS_OK;
977 // optimize : some code within C-C needs to call Seek -> Flush() often.
978 if (mFillPoint == 0) {
979 return NS_OK;
981 RecursiveMutexAutoLock lock(mBufferMutex);
982 rv = Sink()->Write(mBuffer, mFillPoint, &amt);
983 if (NS_FAILED(rv)) {
984 return rv;
986 mBufferStartOffset += amt;
987 if (amt == mFillPoint) {
988 mFillPoint = mCursor = 0;
989 return NS_OK; // flushed everything
992 // slide the remainder down to the start of the buffer
993 // |<-------------->|<---|----->|
994 // b a c s
995 uint32_t rem = mFillPoint - amt;
996 memmove(mBuffer, mBuffer + amt, rem);
997 mFillPoint = mCursor = rem;
998 return NS_ERROR_FAILURE; // didn't flush all
1001 // nsISafeOutputStream
1002 NS_IMETHODIMP
1003 nsBufferedOutputStream::Finish() {
1004 // flush the stream, to write out any buffered data...
1005 nsresult rv1 = nsBufferedOutputStream::Flush();
1006 nsresult rv2 = NS_OK;
1008 if (NS_FAILED(rv1)) {
1009 NS_WARNING(
1010 "(debug) nsBufferedOutputStream::Flush() failed in "
1011 "nsBufferedOutputStream::Finish()! Possible dataloss.");
1013 rv2 = Sink()->Close();
1014 if (NS_FAILED(rv2)) {
1015 NS_WARNING(
1016 "(debug) Sink()->Close() failed in nsBufferedOutputStream::Finish()! "
1017 "Possible dataloss.");
1019 } else {
1020 rv2 = mSafeStream->Finish();
1021 if (NS_FAILED(rv2)) {
1022 NS_WARNING(
1023 "(debug) mSafeStream->Finish() failed within "
1024 "nsBufferedOutputStream::Flush()! Possible dataloss.");
1028 // ... and close the buffered stream, so any further attempts to flush/close
1029 // the buffered stream won't cause errors.
1030 nsBufferedStream::Close();
1032 // We want to return the errors precisely from Finish()
1033 // and mimick the existing error handling in
1034 // nsBufferedOutputStream::Close() as reference.
1036 if (NS_FAILED(rv1)) {
1037 return rv1;
1039 if (NS_FAILED(rv2)) {
1040 return rv2;
1042 return NS_OK;
1045 NS_IMETHODIMP
1046 nsBufferedOutputStream::WriteFrom(nsIInputStream* inStr, uint32_t count,
1047 uint32_t* _retval) {
1048 return WriteSegments(NS_CopyStreamToSegment, inStr, count, _retval);
1051 NS_IMETHODIMP
1052 nsBufferedOutputStream::WriteSegments(nsReadSegmentFun reader, void* closure,
1053 uint32_t count, uint32_t* _retval) {
1054 *_retval = 0;
1055 nsresult rv;
1056 RecursiveMutexAutoLock lock(mBufferMutex);
1057 while (count > 0) {
1058 uint32_t left = std::min(count, mBufferSize - mCursor);
1059 if (left == 0) {
1060 rv = Flush();
1061 if (NS_FAILED(rv)) {
1062 return (*_retval > 0) ? NS_OK : rv;
1065 continue;
1068 uint32_t read = 0;
1069 rv = reader(this, closure, mBuffer + mCursor, *_retval, left, &read);
1071 if (NS_FAILED(rv)) { // If we have read some data, return ok
1072 return (*_retval > 0) ? NS_OK : rv;
1074 mCursor += read;
1075 *_retval += read;
1076 count -= read;
1077 mFillPoint = std::max(mFillPoint, mCursor);
1079 return NS_OK;
1082 NS_IMETHODIMP
1083 nsBufferedOutputStream::IsNonBlocking(bool* aNonBlocking) {
1084 if (mStream) {
1085 return Sink()->IsNonBlocking(aNonBlocking);
1087 return NS_ERROR_NOT_INITIALIZED;
1090 NS_IMETHODIMP_(char*)
1091 nsBufferedOutputStream::GetBuffer(uint32_t aLength, uint32_t aAlignMask) {
1092 NS_ASSERTION(mGetBufferCount == 0, "nested GetBuffer!");
1093 if (mGetBufferCount != 0) {
1094 return nullptr;
1097 if (mBufferDisabled) {
1098 return nullptr;
1101 RecursiveMutexAutoLock lock(mBufferMutex);
1102 char* buf = mBuffer + mCursor;
1103 uint32_t rem = mBufferSize - mCursor;
1104 if (rem == 0) {
1105 if (NS_FAILED(Flush())) {
1106 return nullptr;
1108 buf = mBuffer + mCursor;
1109 rem = mBufferSize - mCursor;
1112 uint32_t mod = (NS_PTR_TO_INT32(buf) & aAlignMask);
1113 if (mod) {
1114 uint32_t pad = aAlignMask + 1 - mod;
1115 if (pad > rem) {
1116 return nullptr;
1119 memset(buf, 0, pad);
1120 mCursor += pad;
1121 buf += pad;
1122 rem -= pad;
1125 if (aLength > rem) {
1126 return nullptr;
1128 mGetBufferCount++;
1129 return buf;
1132 NS_IMETHODIMP_(void)
1133 nsBufferedOutputStream::PutBuffer(char* aBuffer, uint32_t aLength) {
1134 NS_ASSERTION(mGetBufferCount == 1, "stray PutBuffer!");
1135 if (--mGetBufferCount != 0) {
1136 return;
1139 NS_ASSERTION(mCursor + aLength <= mBufferSize, "PutBuffer botch");
1140 mCursor += aLength;
1141 if (mFillPoint < mCursor) {
1142 mFillPoint = mCursor;
1146 NS_IMETHODIMP
1147 nsBufferedOutputStream::DisableBuffering() {
1148 NS_ASSERTION(!mBufferDisabled, "redundant call to DisableBuffering!");
1149 NS_ASSERTION(mGetBufferCount == 0,
1150 "DisableBuffer call between GetBuffer and PutBuffer!");
1151 if (mGetBufferCount != 0) {
1152 return NS_ERROR_UNEXPECTED;
1155 // Empty the buffer so nsBufferedStream::Tell works.
1156 nsresult rv = Flush();
1157 if (NS_FAILED(rv)) {
1158 return rv;
1161 mBufferDisabled = true;
1162 return NS_OK;
1165 NS_IMETHODIMP
1166 nsBufferedOutputStream::EnableBuffering() {
1167 NS_ASSERTION(mBufferDisabled, "gratuitous call to EnableBuffering!");
1168 mBufferDisabled = false;
1169 return NS_OK;
1172 NS_IMETHODIMP
1173 nsBufferedOutputStream::GetUnbufferedStream(nsISupports** aStream) {
1174 // Empty the buffer so subsequent i/o trumps any buffered data.
1175 if (mFillPoint) {
1176 nsresult rv = Flush();
1177 if (NS_FAILED(rv)) {
1178 return rv;
1182 nsCOMPtr<nsISupports> stream = mStream;
1183 stream.forget(aStream);
1184 return NS_OK;
1187 NS_IMETHODIMP
1188 nsBufferedOutputStream::GetData(nsIOutputStream** aResult) {
1189 nsCOMPtr<nsISupports> stream;
1190 nsBufferedStream::GetData(getter_AddRefs(stream));
1191 nsCOMPtr<nsIOutputStream> outputStream = do_QueryInterface(stream);
1192 outputStream.forget(aResult);
1193 return NS_OK;
1195 #undef METER
1197 ////////////////////////////////////////////////////////////////////////////////