Bumping manifests a=b2g-bump
[gecko.git] / xpcom / io / nsPipe3.cpp
blob327f51daec5502cd61093defa0e8757e553bfd25
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/Attributes.h"
8 #include "mozilla/ReentrantMonitor.h"
9 #include "nsIPipe.h"
10 #include "nsIEventTarget.h"
11 #include "nsISeekableStream.h"
12 #include "nsIProgrammingLanguage.h"
13 #include "nsSegmentedBuffer.h"
14 #include "nsStreamUtils.h"
15 #include "nsCOMPtr.h"
16 #include "nsCRT.h"
17 #include "prlog.h"
18 #include "nsIClassInfoImpl.h"
19 #include "nsAlgorithm.h"
20 #include "nsMemory.h"
21 #include "nsIAsyncInputStream.h"
22 #include "nsIAsyncOutputStream.h"
24 using namespace mozilla;
26 #ifdef LOG
27 #undef LOG
28 #endif
29 #if defined(PR_LOGGING)
31 // set NSPR_LOG_MODULES=nsPipe:5
33 static PRLogModuleInfo*
34 GetPipeLog()
36 static PRLogModuleInfo* sLog;
37 if (!sLog) {
38 sLog = PR_NewLogModule("nsPipe");
40 return sLog;
42 #define LOG(args) PR_LOG(GetPipeLog(), PR_LOG_DEBUG, args)
43 #else
44 #define LOG(args)
45 #endif
47 #define DEFAULT_SEGMENT_SIZE 4096
48 #define DEFAULT_SEGMENT_COUNT 16
50 class nsPipe;
51 class nsPipeEvents;
52 class nsPipeInputStream;
53 class nsPipeOutputStream;
55 //-----------------------------------------------------------------------------
57 // this class is used to delay notifications until the end of a particular
58 // scope. it helps avoid the complexity of issuing callbacks while inside
59 // a critical section.
60 class nsPipeEvents
62 public:
63 nsPipeEvents() { }
64 ~nsPipeEvents();
66 inline void NotifyInputReady(nsIAsyncInputStream* aStream,
67 nsIInputStreamCallback* aCallback)
69 NS_ASSERTION(!mInputCallback, "already have an input event");
70 mInputStream = aStream;
71 mInputCallback = aCallback;
74 inline void NotifyOutputReady(nsIAsyncOutputStream* aStream,
75 nsIOutputStreamCallback* aCallback)
77 NS_ASSERTION(!mOutputCallback, "already have an output event");
78 mOutputStream = aStream;
79 mOutputCallback = aCallback;
82 private:
83 nsCOMPtr<nsIAsyncInputStream> mInputStream;
84 nsCOMPtr<nsIInputStreamCallback> mInputCallback;
85 nsCOMPtr<nsIAsyncOutputStream> mOutputStream;
86 nsCOMPtr<nsIOutputStreamCallback> mOutputCallback;
89 //-----------------------------------------------------------------------------
91 // the input end of a pipe (allocated as a member of the pipe).
92 class nsPipeInputStream
93 : public nsIAsyncInputStream
94 , public nsISeekableStream
95 , public nsISearchableInputStream
96 , public nsIClassInfo
98 public:
99 // since this class will be allocated as a member of the pipe, we do not
100 // need our own ref count. instead, we share the lifetime (the ref count)
101 // of the entire pipe. this macro is just convenience since it does not
102 // declare a mRefCount variable; however, don't let the name fool you...
103 // we are not inheriting from nsPipe ;-)
104 NS_DECL_ISUPPORTS_INHERITED
106 NS_DECL_NSIINPUTSTREAM
107 NS_DECL_NSIASYNCINPUTSTREAM
108 NS_DECL_NSISEEKABLESTREAM
109 NS_DECL_NSISEARCHABLEINPUTSTREAM
110 NS_DECL_NSICLASSINFO
112 explicit nsPipeInputStream(nsPipe* aPipe)
113 : mPipe(aPipe)
114 , mReaderRefCnt(0)
115 , mLogicalOffset(0)
116 , mBlocking(true)
117 , mBlocked(false)
118 , mAvailable(0)
119 , mCallbackFlags(0)
122 nsresult Fill();
123 void SetNonBlocking(bool aNonBlocking)
125 mBlocking = !aNonBlocking;
128 uint32_t Available()
130 return mAvailable;
132 void ReduceAvailable(uint32_t aAvail)
134 mAvailable -= aAvail;
137 // synchronously wait for the pipe to become readable.
138 nsresult Wait();
140 // these functions return true to indicate that the pipe's monitor should
141 // be notified, to wake up a blocked reader if any.
142 bool OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&);
143 bool OnInputException(nsresult, nsPipeEvents&);
145 private:
146 nsPipe* mPipe;
148 // separate refcnt so that we know when to close the consumer
149 mozilla::ThreadSafeAutoRefCnt mReaderRefCnt;
150 int64_t mLogicalOffset;
151 bool mBlocking;
153 // these variables can only be accessed while inside the pipe's monitor
154 bool mBlocked;
155 uint32_t mAvailable;
156 nsCOMPtr<nsIInputStreamCallback> mCallback;
157 uint32_t mCallbackFlags;
160 //-----------------------------------------------------------------------------
162 // the output end of a pipe (allocated as a member of the pipe).
163 class nsPipeOutputStream
164 : public nsIAsyncOutputStream
165 , public nsIClassInfo
167 public:
168 // since this class will be allocated as a member of the pipe, we do not
169 // need our own ref count. instead, we share the lifetime (the ref count)
170 // of the entire pipe. this macro is just convenience since it does not
171 // declare a mRefCount variable; however, don't let the name fool you...
172 // we are not inheriting from nsPipe ;-)
173 NS_DECL_ISUPPORTS_INHERITED
175 NS_DECL_NSIOUTPUTSTREAM
176 NS_DECL_NSIASYNCOUTPUTSTREAM
177 NS_DECL_NSICLASSINFO
179 explicit nsPipeOutputStream(nsPipe* aPipe)
180 : mPipe(aPipe)
181 , mWriterRefCnt(0)
182 , mLogicalOffset(0)
183 , mBlocking(true)
184 , mBlocked(false)
185 , mWritable(true)
186 , mCallbackFlags(0)
189 void SetNonBlocking(bool aNonBlocking)
191 mBlocking = !aNonBlocking;
193 void SetWritable(bool aWritable)
195 mWritable = aWritable;
198 // synchronously wait for the pipe to become writable.
199 nsresult Wait();
201 // these functions return true to indicate that the pipe's monitor should
202 // be notified, to wake up a blocked writer if any.
203 bool OnOutputWritable(nsPipeEvents&);
204 bool OnOutputException(nsresult, nsPipeEvents&);
206 private:
207 nsPipe* mPipe;
209 // separate refcnt so that we know when to close the producer
210 mozilla::ThreadSafeAutoRefCnt mWriterRefCnt;
211 int64_t mLogicalOffset;
212 bool mBlocking;
214 // these variables can only be accessed while inside the pipe's monitor
215 bool mBlocked;
216 bool mWritable;
217 nsCOMPtr<nsIOutputStreamCallback> mCallback;
218 uint32_t mCallbackFlags;
221 //-----------------------------------------------------------------------------
223 class nsPipe MOZ_FINAL : public nsIPipe
225 public:
226 friend class nsPipeInputStream;
227 friend class nsPipeOutputStream;
229 NS_DECL_THREADSAFE_ISUPPORTS
230 NS_DECL_NSIPIPE
232 // nsPipe methods:
233 nsPipe();
235 private:
236 ~nsPipe();
238 public:
240 // methods below may only be called while inside the pipe's monitor
243 void PeekSegment(uint32_t aIndex, char*& aCursor, char*& aLimit);
246 // methods below may be called while outside the pipe's monitor
249 nsresult GetReadSegment(const char*& aSegment, uint32_t& aSegmentLen);
250 void AdvanceReadCursor(uint32_t aCount);
252 nsresult GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen);
253 void AdvanceWriteCursor(uint32_t aCount);
255 void OnPipeException(nsresult aReason, bool aOutputOnly = false);
257 protected:
258 // We can't inherit from both nsIInputStream and nsIOutputStream
259 // because they collide on their Close method. Consequently we nest their
260 // implementations to avoid the extra object allocation.
261 nsPipeInputStream mInput;
262 nsPipeOutputStream mOutput;
264 ReentrantMonitor mReentrantMonitor;
265 nsSegmentedBuffer mBuffer;
267 char* mReadCursor;
268 char* mReadLimit;
270 int32_t mWriteSegment;
271 char* mWriteCursor;
272 char* mWriteLimit;
274 nsresult mStatus;
275 bool mInited;
279 // NOTES on buffer architecture:
281 // +-----------------+ - - mBuffer.GetSegment(0)
282 // | |
283 // + - - - - - - - - + - - mReadCursor
284 // |/////////////////|
285 // |/////////////////|
286 // |/////////////////|
287 // |/////////////////|
288 // +-----------------+ - - mReadLimit
289 // |
290 // +-----------------+
291 // |/////////////////|
292 // |/////////////////|
293 // |/////////////////|
294 // |/////////////////|
295 // |/////////////////|
296 // |/////////////////|
297 // +-----------------+
298 // |
299 // +-----------------+ - - mBuffer.GetSegment(mWriteSegment)
300 // |/////////////////|
301 // |/////////////////|
302 // |/////////////////|
303 // + - - - - - - - - + - - mWriteCursor
304 // | |
305 // | |
306 // +-----------------+ - - mWriteLimit
308 // (shaded region contains data)
310 // NOTE: on some systems (notably OS/2), the heap allocator uses an arena for
311 // small allocations (e.g., 64 byte allocations). this means that buffers may
312 // be allocated back-to-back. in the diagram above, for example, mReadLimit
313 // would actually be pointing at the beginning of the next segment. when
314 // making changes to this file, please keep this fact in mind.
317 //-----------------------------------------------------------------------------
318 // nsPipe methods:
319 //-----------------------------------------------------------------------------
321 nsPipe::nsPipe()
322 : mInput(MOZ_THIS_IN_INITIALIZER_LIST())
323 , mOutput(MOZ_THIS_IN_INITIALIZER_LIST())
324 , mReentrantMonitor("nsPipe.mReentrantMonitor")
325 , mReadCursor(nullptr)
326 , mReadLimit(nullptr)
327 , mWriteSegment(-1)
328 , mWriteCursor(nullptr)
329 , mWriteLimit(nullptr)
330 , mStatus(NS_OK)
331 , mInited(false)
335 nsPipe::~nsPipe()
339 NS_IMPL_ISUPPORTS(nsPipe, nsIPipe)
341 NS_IMETHODIMP
342 nsPipe::Init(bool aNonBlockingIn,
343 bool aNonBlockingOut,
344 uint32_t aSegmentSize,
345 uint32_t aSegmentCount)
347 mInited = true;
349 if (aSegmentSize == 0) {
350 aSegmentSize = DEFAULT_SEGMENT_SIZE;
352 if (aSegmentCount == 0) {
353 aSegmentCount = DEFAULT_SEGMENT_COUNT;
356 // protect against overflow
357 uint32_t maxCount = uint32_t(-1) / aSegmentSize;
358 if (aSegmentCount > maxCount) {
359 aSegmentCount = maxCount;
362 nsresult rv = mBuffer.Init(aSegmentSize, aSegmentSize * aSegmentCount);
363 if (NS_FAILED(rv)) {
364 return rv;
367 mInput.SetNonBlocking(aNonBlockingIn);
368 mOutput.SetNonBlocking(aNonBlockingOut);
369 return NS_OK;
372 NS_IMETHODIMP
373 nsPipe::GetInputStream(nsIAsyncInputStream** aInputStream)
375 NS_ADDREF(*aInputStream = &mInput);
376 return NS_OK;
379 NS_IMETHODIMP
380 nsPipe::GetOutputStream(nsIAsyncOutputStream** aOutputStream)
382 if (NS_WARN_IF(!mInited)) {
383 return NS_ERROR_NOT_INITIALIZED;
385 NS_ADDREF(*aOutputStream = &mOutput);
386 return NS_OK;
389 void
390 nsPipe::PeekSegment(uint32_t aIndex, char*& aCursor, char*& aLimit)
392 if (aIndex == 0) {
393 NS_ASSERTION(!mReadCursor || mBuffer.GetSegmentCount(), "unexpected state");
394 aCursor = mReadCursor;
395 aLimit = mReadLimit;
396 } else {
397 uint32_t numSegments = mBuffer.GetSegmentCount();
398 if (aIndex >= numSegments) {
399 aCursor = aLimit = nullptr;
400 } else {
401 aCursor = mBuffer.GetSegment(aIndex);
402 if (mWriteSegment == (int32_t)aIndex) {
403 aLimit = mWriteCursor;
404 } else {
405 aLimit = aCursor + mBuffer.GetSegmentSize();
411 nsresult
412 nsPipe::GetReadSegment(const char*& aSegment, uint32_t& aSegmentLen)
414 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
416 if (mReadCursor == mReadLimit) {
417 return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK;
420 aSegment = mReadCursor;
421 aSegmentLen = mReadLimit - mReadCursor;
422 return NS_OK;
425 void
426 nsPipe::AdvanceReadCursor(uint32_t aBytesRead)
428 NS_ASSERTION(aBytesRead, "don't call if no bytes read");
430 nsPipeEvents events;
432 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
434 LOG(("III advancing read cursor by %u\n", aBytesRead));
435 NS_ASSERTION(aBytesRead <= mBuffer.GetSegmentSize(), "read too much");
437 mReadCursor += aBytesRead;
438 NS_ASSERTION(mReadCursor <= mReadLimit, "read cursor exceeds limit");
440 mInput.ReduceAvailable(aBytesRead);
442 if (mReadCursor == mReadLimit) {
443 // we've reached the limit of how much we can read from this segment.
444 // if at the end of this segment, then we must discard this segment.
446 // if still writing in this segment then bail because we're not done
447 // with the segment and have to wait for now...
448 if (mWriteSegment == 0 && mWriteLimit > mWriteCursor) {
449 NS_ASSERTION(mReadLimit == mWriteCursor, "unexpected state");
450 return;
453 // shift write segment index (-1 indicates an empty buffer).
454 --mWriteSegment;
456 // done with this segment
457 mBuffer.DeleteFirstSegment();
458 LOG(("III deleting first segment\n"));
460 if (mWriteSegment == -1) {
461 // buffer is completely empty
462 mReadCursor = nullptr;
463 mReadLimit = nullptr;
464 mWriteCursor = nullptr;
465 mWriteLimit = nullptr;
466 } else {
467 // advance read cursor and limit to next buffer segment
468 mReadCursor = mBuffer.GetSegment(0);
469 if (mWriteSegment == 0) {
470 mReadLimit = mWriteCursor;
471 } else {
472 mReadLimit = mReadCursor + mBuffer.GetSegmentSize();
476 // we've free'd up a segment, so notify output stream that pipe has
477 // room for a new segment.
478 if (mOutput.OnOutputWritable(events)) {
479 mon.Notify();
485 nsresult
486 nsPipe::GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen)
488 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
490 if (NS_FAILED(mStatus)) {
491 return mStatus;
494 // write cursor and limit may both be null indicating an empty buffer.
495 if (mWriteCursor == mWriteLimit) {
496 char* seg = mBuffer.AppendNewSegment();
497 // pipe is full
498 if (!seg) {
499 return NS_BASE_STREAM_WOULD_BLOCK;
501 LOG(("OOO appended new segment\n"));
502 mWriteCursor = seg;
503 mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize();
504 ++mWriteSegment;
507 // make sure read cursor is initialized
508 if (!mReadCursor) {
509 NS_ASSERTION(mWriteSegment == 0, "unexpected null read cursor");
510 mReadCursor = mReadLimit = mWriteCursor;
513 // check to see if we can roll-back our read and write cursors to the
514 // beginning of the current/first segment. this is purely an optimization.
515 if (mReadCursor == mWriteCursor && mWriteSegment == 0) {
516 char* head = mBuffer.GetSegment(0);
517 LOG(("OOO rolling back write cursor %u bytes\n", mWriteCursor - head));
518 mWriteCursor = mReadCursor = mReadLimit = head;
521 aSegment = mWriteCursor;
522 aSegmentLen = mWriteLimit - mWriteCursor;
523 return NS_OK;
526 void
527 nsPipe::AdvanceWriteCursor(uint32_t aBytesWritten)
529 NS_ASSERTION(aBytesWritten, "don't call if no bytes written");
531 nsPipeEvents events;
533 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
535 LOG(("OOO advancing write cursor by %u\n", aBytesWritten));
537 char* newWriteCursor = mWriteCursor + aBytesWritten;
538 NS_ASSERTION(newWriteCursor <= mWriteLimit, "write cursor exceeds limit");
540 // update read limit if reading in the same segment
541 if (mWriteSegment == 0 && mReadLimit == mWriteCursor) {
542 mReadLimit = newWriteCursor;
545 mWriteCursor = newWriteCursor;
547 // The only way mReadCursor == mWriteCursor is if:
549 // - mReadCursor is at the start of a segment (which, based on how
550 // nsSegmentedBuffer works, means that this segment is the "first"
551 // segment)
552 // - mWriteCursor points at the location past the end of the current
553 // write segment (so the current write filled the current write
554 // segment, so we've incremented mWriteCursor to point past the end
555 // of it)
556 // - the segment to which data has just been written is located
557 // exactly one segment's worth of bytes before the first segment
558 // where mReadCursor is located
560 // Consequently, the byte immediately after the end of the current
561 // write segment is the first byte of the first segment, so
562 // mReadCursor == mWriteCursor. (Another way to think about this is
563 // to consider the buffer architecture diagram above, but consider it
564 // with an arena allocator which allocates from the *end* of the
565 // arena to the *beginning* of the arena.)
566 NS_ASSERTION(mReadCursor != mWriteCursor ||
567 (mBuffer.GetSegment(0) == mReadCursor &&
568 mWriteCursor == mWriteLimit),
569 "read cursor is bad");
571 // update the writable flag on the output stream
572 if (mWriteCursor == mWriteLimit) {
573 if (mBuffer.GetSize() >= mBuffer.GetMaxSize()) {
574 mOutput.SetWritable(false);
578 // notify input stream that pipe now contains additional data
579 if (mInput.OnInputReadable(aBytesWritten, events)) {
580 mon.Notify();
585 void
586 nsPipe::OnPipeException(nsresult aReason, bool aOutputOnly)
588 LOG(("PPP nsPipe::OnPipeException [reason=%x output-only=%d]\n",
589 aReason, aOutputOnly));
591 nsPipeEvents events;
593 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
595 // if we've already hit an exception, then ignore this one.
596 if (NS_FAILED(mStatus)) {
597 return;
600 mStatus = aReason;
602 // an output-only exception applies to the input end if the pipe has
603 // zero bytes available.
604 if (aOutputOnly && !mInput.Available()) {
605 aOutputOnly = false;
608 if (!aOutputOnly)
609 if (mInput.OnInputException(aReason, events)) {
610 mon.Notify();
613 if (mOutput.OnOutputException(aReason, events)) {
614 mon.Notify();
619 //-----------------------------------------------------------------------------
620 // nsPipeEvents methods:
621 //-----------------------------------------------------------------------------
623 nsPipeEvents::~nsPipeEvents()
625 // dispatch any pending events
627 if (mInputCallback) {
628 mInputCallback->OnInputStreamReady(mInputStream);
629 mInputCallback = 0;
630 mInputStream = 0;
632 if (mOutputCallback) {
633 mOutputCallback->OnOutputStreamReady(mOutputStream);
634 mOutputCallback = 0;
635 mOutputStream = 0;
639 //-----------------------------------------------------------------------------
640 // nsPipeInputStream methods:
641 //-----------------------------------------------------------------------------
643 NS_IMPL_QUERY_INTERFACE(nsPipeInputStream,
644 nsIInputStream,
645 nsIAsyncInputStream,
646 nsISeekableStream,
647 nsISearchableInputStream,
648 nsIClassInfo)
650 NS_IMPL_CI_INTERFACE_GETTER(nsPipeInputStream,
651 nsIInputStream,
652 nsIAsyncInputStream,
653 nsISeekableStream,
654 nsISearchableInputStream)
656 NS_IMPL_THREADSAFE_CI(nsPipeInputStream)
658 nsresult
659 nsPipeInputStream::Wait()
661 NS_ASSERTION(mBlocking, "wait on non-blocking pipe input stream");
663 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
665 while (NS_SUCCEEDED(mPipe->mStatus) && (mAvailable == 0)) {
666 LOG(("III pipe input: waiting for data\n"));
668 mBlocked = true;
669 mon.Wait();
670 mBlocked = false;
672 LOG(("III pipe input: woke up [pipe-status=%x available=%u]\n",
673 mPipe->mStatus, mAvailable));
676 return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
679 bool
680 nsPipeInputStream::OnInputReadable(uint32_t aBytesWritten, nsPipeEvents& aEvents)
682 bool result = false;
684 mAvailable += aBytesWritten;
686 if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
687 aEvents.NotifyInputReady(this, mCallback);
688 mCallback = 0;
689 mCallbackFlags = 0;
690 } else if (mBlocked) {
691 result = true;
694 return result;
697 bool
698 nsPipeInputStream::OnInputException(nsresult aReason, nsPipeEvents& aEvents)
700 LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n",
701 this, aReason));
703 bool result = false;
705 NS_ASSERTION(NS_FAILED(aReason), "huh? successful exception");
707 // force count of available bytes to zero.
708 mAvailable = 0;
710 if (mCallback) {
711 aEvents.NotifyInputReady(this, mCallback);
712 mCallback = 0;
713 mCallbackFlags = 0;
714 } else if (mBlocked) {
715 result = true;
718 return result;
721 NS_IMETHODIMP_(MozExternalRefCountType)
722 nsPipeInputStream::AddRef(void)
724 ++mReaderRefCnt;
725 return mPipe->AddRef();
728 NS_IMETHODIMP_(MozExternalRefCountType)
729 nsPipeInputStream::Release(void)
731 if (--mReaderRefCnt == 0) {
732 Close();
734 return mPipe->Release();
737 NS_IMETHODIMP
738 nsPipeInputStream::CloseWithStatus(nsresult aReason)
740 LOG(("III CloseWithStatus [this=%x reason=%x]\n", this, aReason));
742 if (NS_SUCCEEDED(aReason)) {
743 aReason = NS_BASE_STREAM_CLOSED;
746 mPipe->OnPipeException(aReason);
747 return NS_OK;
750 NS_IMETHODIMP
751 nsPipeInputStream::Close()
753 return CloseWithStatus(NS_BASE_STREAM_CLOSED);
756 NS_IMETHODIMP
757 nsPipeInputStream::Available(uint64_t* aResult)
759 // nsPipeInputStream supports under 4GB stream only
760 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
762 // return error if pipe closed
763 if (!mAvailable && NS_FAILED(mPipe->mStatus)) {
764 return mPipe->mStatus;
767 *aResult = (uint64_t)mAvailable;
768 return NS_OK;
771 NS_IMETHODIMP
772 nsPipeInputStream::ReadSegments(nsWriteSegmentFun aWriter,
773 void* aClosure,
774 uint32_t aCount,
775 uint32_t* aReadCount)
777 LOG(("III ReadSegments [this=%x count=%u]\n", this, aCount));
779 nsresult rv = NS_OK;
781 const char* segment;
782 uint32_t segmentLen;
784 *aReadCount = 0;
785 while (aCount) {
786 rv = mPipe->GetReadSegment(segment, segmentLen);
787 if (NS_FAILED(rv)) {
788 // ignore this error if we've already read something.
789 if (*aReadCount > 0) {
790 rv = NS_OK;
791 break;
793 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
794 // pipe is empty
795 if (!mBlocking) {
796 break;
798 // wait for some data to be written to the pipe
799 rv = Wait();
800 if (NS_SUCCEEDED(rv)) {
801 continue;
804 // ignore this error, just return.
805 if (rv == NS_BASE_STREAM_CLOSED) {
806 rv = NS_OK;
807 break;
809 mPipe->OnPipeException(rv);
810 break;
813 // read no more than aCount
814 if (segmentLen > aCount) {
815 segmentLen = aCount;
818 uint32_t writeCount, originalLen = segmentLen;
819 while (segmentLen) {
820 writeCount = 0;
822 rv = aWriter(this, aClosure, segment, *aReadCount, segmentLen, &writeCount);
824 if (NS_FAILED(rv) || writeCount == 0) {
825 aCount = 0;
826 // any errors returned from the writer end here: do not
827 // propagate to the caller of ReadSegments.
828 rv = NS_OK;
829 break;
832 NS_ASSERTION(writeCount <= segmentLen, "wrote more than expected");
833 segment += writeCount;
834 segmentLen -= writeCount;
835 aCount -= writeCount;
836 *aReadCount += writeCount;
837 mLogicalOffset += writeCount;
840 if (segmentLen < originalLen) {
841 mPipe->AdvanceReadCursor(originalLen - segmentLen);
845 return rv;
848 NS_IMETHODIMP
849 nsPipeInputStream::Read(char* aToBuf, uint32_t aBufLen, uint32_t* aReadCount)
851 return ReadSegments(NS_CopySegmentToBuffer, aToBuf, aBufLen, aReadCount);
854 NS_IMETHODIMP
855 nsPipeInputStream::IsNonBlocking(bool* aNonBlocking)
857 *aNonBlocking = !mBlocking;
858 return NS_OK;
861 NS_IMETHODIMP
862 nsPipeInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
863 uint32_t aFlags,
864 uint32_t aRequestedCount,
865 nsIEventTarget* aTarget)
867 LOG(("III AsyncWait [this=%x]\n", this));
869 nsPipeEvents pipeEvents;
871 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
873 // replace a pending callback
874 mCallback = 0;
875 mCallbackFlags = 0;
877 if (!aCallback) {
878 return NS_OK;
881 nsCOMPtr<nsIInputStreamCallback> proxy;
882 if (aTarget) {
883 proxy = NS_NewInputStreamReadyEvent(aCallback, aTarget);
884 aCallback = proxy;
887 if (NS_FAILED(mPipe->mStatus) ||
888 (mAvailable && !(aFlags & WAIT_CLOSURE_ONLY))) {
889 // stream is already closed or readable; post event.
890 pipeEvents.NotifyInputReady(this, aCallback);
891 } else {
892 // queue up callback object to be notified when data becomes available
893 mCallback = aCallback;
894 mCallbackFlags = aFlags;
897 return NS_OK;
900 NS_IMETHODIMP
901 nsPipeInputStream::Seek(int32_t aWhence, int64_t aOffset)
903 NS_NOTREACHED("nsPipeInputStream::Seek");
904 return NS_ERROR_NOT_IMPLEMENTED;
907 NS_IMETHODIMP
908 nsPipeInputStream::Tell(int64_t* aOffset)
910 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
912 // return error if pipe closed
913 if (!mAvailable && NS_FAILED(mPipe->mStatus)) {
914 return mPipe->mStatus;
917 *aOffset = mLogicalOffset;
918 return NS_OK;
921 NS_IMETHODIMP
922 nsPipeInputStream::SetEOF()
924 NS_NOTREACHED("nsPipeInputStream::SetEOF");
925 return NS_ERROR_NOT_IMPLEMENTED;
928 #define COMPARE(s1, s2, i) \
929 (aIgnoreCase \
930 ? nsCRT::strncasecmp((const char *)s1, (const char *)s2, (uint32_t)i) \
931 : nsCRT::strncmp((const char *)s1, (const char *)s2, (uint32_t)i))
933 NS_IMETHODIMP
934 nsPipeInputStream::Search(const char* aForString,
935 bool aIgnoreCase,
936 bool* aFound,
937 uint32_t* aOffsetSearchedTo)
939 LOG(("III Search [for=%s ic=%u]\n", aForString, aIgnoreCase));
941 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
943 char* cursor1;
944 char* limit1;
945 uint32_t index = 0, offset = 0;
946 uint32_t strLen = strlen(aForString);
948 mPipe->PeekSegment(0, cursor1, limit1);
949 if (cursor1 == limit1) {
950 *aFound = false;
951 *aOffsetSearchedTo = 0;
952 LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
953 return NS_OK;
956 while (true) {
957 uint32_t i, len1 = limit1 - cursor1;
959 // check if the string is in the buffer segment
960 for (i = 0; i < len1 - strLen + 1; i++) {
961 if (COMPARE(&cursor1[i], aForString, strLen) == 0) {
962 *aFound = true;
963 *aOffsetSearchedTo = offset + i;
964 LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
965 return NS_OK;
969 // get the next segment
970 char* cursor2;
971 char* limit2;
972 uint32_t len2;
974 index++;
975 offset += len1;
977 mPipe->PeekSegment(index, cursor2, limit2);
978 if (cursor2 == limit2) {
979 *aFound = false;
980 *aOffsetSearchedTo = offset - strLen + 1;
981 LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
982 return NS_OK;
984 len2 = limit2 - cursor2;
986 // check if the string is straddling the next buffer segment
987 uint32_t lim = XPCOM_MIN(strLen, len2 + 1);
988 for (i = 0; i < lim; ++i) {
989 uint32_t strPart1Len = strLen - i - 1;
990 uint32_t strPart2Len = strLen - strPart1Len;
991 const char* strPart2 = &aForString[strLen - strPart2Len];
992 uint32_t bufSeg1Offset = len1 - strPart1Len;
993 if (COMPARE(&cursor1[bufSeg1Offset], aForString, strPart1Len) == 0 &&
994 COMPARE(cursor2, strPart2, strPart2Len) == 0) {
995 *aFound = true;
996 *aOffsetSearchedTo = offset - strPart1Len;
997 LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
998 return NS_OK;
1002 // finally continue with the next buffer
1003 cursor1 = cursor2;
1004 limit1 = limit2;
1007 NS_NOTREACHED("can't get here");
1008 return NS_ERROR_UNEXPECTED; // keep compiler happy
1011 //-----------------------------------------------------------------------------
1012 // nsPipeOutputStream methods:
1013 //-----------------------------------------------------------------------------
1015 NS_IMPL_QUERY_INTERFACE(nsPipeOutputStream,
1016 nsIOutputStream,
1017 nsIAsyncOutputStream,
1018 nsIClassInfo)
1020 NS_IMPL_CI_INTERFACE_GETTER(nsPipeOutputStream,
1021 nsIOutputStream,
1022 nsIAsyncOutputStream)
1024 NS_IMPL_THREADSAFE_CI(nsPipeOutputStream)
1026 nsresult
1027 nsPipeOutputStream::Wait()
1029 NS_ASSERTION(mBlocking, "wait on non-blocking pipe output stream");
1031 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1033 if (NS_SUCCEEDED(mPipe->mStatus) && !mWritable) {
1034 LOG(("OOO pipe output: waiting for space\n"));
1035 mBlocked = true;
1036 mon.Wait();
1037 mBlocked = false;
1038 LOG(("OOO pipe output: woke up [pipe-status=%x writable=%u]\n",
1039 mPipe->mStatus, mWritable));
1042 return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
1045 bool
1046 nsPipeOutputStream::OnOutputWritable(nsPipeEvents& aEvents)
1048 bool result = false;
1050 mWritable = true;
1052 if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
1053 aEvents.NotifyOutputReady(this, mCallback);
1054 mCallback = 0;
1055 mCallbackFlags = 0;
1056 } else if (mBlocked) {
1057 result = true;
1060 return result;
1063 bool
1064 nsPipeOutputStream::OnOutputException(nsresult aReason, nsPipeEvents& aEvents)
1066 LOG(("nsPipeOutputStream::OnOutputException [this=%x reason=%x]\n",
1067 this, aReason));
1069 bool result = false;
1071 NS_ASSERTION(NS_FAILED(aReason), "huh? successful exception");
1072 mWritable = false;
1074 if (mCallback) {
1075 aEvents.NotifyOutputReady(this, mCallback);
1076 mCallback = 0;
1077 mCallbackFlags = 0;
1078 } else if (mBlocked) {
1079 result = true;
1082 return result;
1086 NS_IMETHODIMP_(MozExternalRefCountType)
1087 nsPipeOutputStream::AddRef()
1089 ++mWriterRefCnt;
1090 return mPipe->AddRef();
1093 NS_IMETHODIMP_(MozExternalRefCountType)
1094 nsPipeOutputStream::Release()
1096 if (--mWriterRefCnt == 0) {
1097 Close();
1099 return mPipe->Release();
1102 NS_IMETHODIMP
1103 nsPipeOutputStream::CloseWithStatus(nsresult aReason)
1105 LOG(("OOO CloseWithStatus [this=%x reason=%x]\n", this, aReason));
1107 if (NS_SUCCEEDED(aReason)) {
1108 aReason = NS_BASE_STREAM_CLOSED;
1111 // input stream may remain open
1112 mPipe->OnPipeException(aReason, true);
1113 return NS_OK;
1116 NS_IMETHODIMP
1117 nsPipeOutputStream::Close()
1119 return CloseWithStatus(NS_BASE_STREAM_CLOSED);
1122 NS_IMETHODIMP
1123 nsPipeOutputStream::WriteSegments(nsReadSegmentFun aReader,
1124 void* aClosure,
1125 uint32_t aCount,
1126 uint32_t* aWriteCount)
1128 LOG(("OOO WriteSegments [this=%x count=%u]\n", this, aCount));
1130 nsresult rv = NS_OK;
1132 char* segment;
1133 uint32_t segmentLen;
1135 *aWriteCount = 0;
1136 while (aCount) {
1137 rv = mPipe->GetWriteSegment(segment, segmentLen);
1138 if (NS_FAILED(rv)) {
1139 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
1140 // pipe is full
1141 if (!mBlocking) {
1142 // ignore this error if we've already written something
1143 if (*aWriteCount > 0) {
1144 rv = NS_OK;
1146 break;
1148 // wait for the pipe to have an empty segment.
1149 rv = Wait();
1150 if (NS_SUCCEEDED(rv)) {
1151 continue;
1154 mPipe->OnPipeException(rv);
1155 break;
1158 // write no more than aCount
1159 if (segmentLen > aCount) {
1160 segmentLen = aCount;
1163 uint32_t readCount, originalLen = segmentLen;
1164 while (segmentLen) {
1165 readCount = 0;
1167 rv = aReader(this, aClosure, segment, *aWriteCount, segmentLen, &readCount);
1169 if (NS_FAILED(rv) || readCount == 0) {
1170 aCount = 0;
1171 // any errors returned from the aReader end here: do not
1172 // propagate to the caller of WriteSegments.
1173 rv = NS_OK;
1174 break;
1177 NS_ASSERTION(readCount <= segmentLen, "read more than expected");
1178 segment += readCount;
1179 segmentLen -= readCount;
1180 aCount -= readCount;
1181 *aWriteCount += readCount;
1182 mLogicalOffset += readCount;
1185 if (segmentLen < originalLen) {
1186 mPipe->AdvanceWriteCursor(originalLen - segmentLen);
1190 return rv;
1193 static NS_METHOD
1194 nsReadFromRawBuffer(nsIOutputStream* aOutStr,
1195 void* aClosure,
1196 char* aToRawSegment,
1197 uint32_t aOffset,
1198 uint32_t aCount,
1199 uint32_t* aReadCount)
1201 const char* fromBuf = (const char*)aClosure;
1202 memcpy(aToRawSegment, &fromBuf[aOffset], aCount);
1203 *aReadCount = aCount;
1204 return NS_OK;
1207 NS_IMETHODIMP
1208 nsPipeOutputStream::Write(const char* aFromBuf,
1209 uint32_t aBufLen,
1210 uint32_t* aWriteCount)
1212 return WriteSegments(nsReadFromRawBuffer, (void*)aFromBuf, aBufLen, aWriteCount);
1215 NS_IMETHODIMP
1216 nsPipeOutputStream::Flush(void)
1218 // nothing to do
1219 return NS_OK;
1222 static NS_METHOD
1223 nsReadFromInputStream(nsIOutputStream* aOutStr,
1224 void* aClosure,
1225 char* aToRawSegment,
1226 uint32_t aOffset,
1227 uint32_t aCount,
1228 uint32_t* aReadCount)
1230 nsIInputStream* fromStream = (nsIInputStream*)aClosure;
1231 return fromStream->Read(aToRawSegment, aCount, aReadCount);
1234 NS_IMETHODIMP
1235 nsPipeOutputStream::WriteFrom(nsIInputStream* aFromStream,
1236 uint32_t aCount,
1237 uint32_t* aWriteCount)
1239 return WriteSegments(nsReadFromInputStream, aFromStream, aCount, aWriteCount);
1242 NS_IMETHODIMP
1243 nsPipeOutputStream::IsNonBlocking(bool* aNonBlocking)
1245 *aNonBlocking = !mBlocking;
1246 return NS_OK;
1249 NS_IMETHODIMP
1250 nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback* aCallback,
1251 uint32_t aFlags,
1252 uint32_t aRequestedCount,
1253 nsIEventTarget* aTarget)
1255 LOG(("OOO AsyncWait [this=%x]\n", this));
1257 nsPipeEvents pipeEvents;
1259 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1261 // replace a pending callback
1262 mCallback = 0;
1263 mCallbackFlags = 0;
1265 if (!aCallback) {
1266 return NS_OK;
1269 nsCOMPtr<nsIOutputStreamCallback> proxy;
1270 if (aTarget) {
1271 proxy = NS_NewOutputStreamReadyEvent(aCallback, aTarget);
1272 aCallback = proxy;
1275 if (NS_FAILED(mPipe->mStatus) ||
1276 (mWritable && !(aFlags & WAIT_CLOSURE_ONLY))) {
1277 // stream is already closed or writable; post event.
1278 pipeEvents.NotifyOutputReady(this, aCallback);
1279 } else {
1280 // queue up callback object to be notified when data becomes available
1281 mCallback = aCallback;
1282 mCallbackFlags = aFlags;
1285 return NS_OK;
1288 ////////////////////////////////////////////////////////////////////////////////
1290 nsresult
1291 NS_NewPipe(nsIInputStream** aPipeIn,
1292 nsIOutputStream** aPipeOut,
1293 uint32_t aSegmentSize,
1294 uint32_t aMaxSize,
1295 bool aNonBlockingInput,
1296 bool aNonBlockingOutput)
1298 if (aSegmentSize == 0) {
1299 aSegmentSize = DEFAULT_SEGMENT_SIZE;
1302 // Handle aMaxSize of UINT32_MAX as a special case
1303 uint32_t segmentCount;
1304 if (aMaxSize == UINT32_MAX) {
1305 segmentCount = UINT32_MAX;
1306 } else {
1307 segmentCount = aMaxSize / aSegmentSize;
1310 nsIAsyncInputStream* in;
1311 nsIAsyncOutputStream* out;
1312 nsresult rv = NS_NewPipe2(&in, &out, aNonBlockingInput, aNonBlockingOutput,
1313 aSegmentSize, segmentCount);
1314 if (NS_FAILED(rv)) {
1315 return rv;
1318 *aPipeIn = in;
1319 *aPipeOut = out;
1320 return NS_OK;
1323 nsresult
1324 NS_NewPipe2(nsIAsyncInputStream** aPipeIn,
1325 nsIAsyncOutputStream** aPipeOut,
1326 bool aNonBlockingInput,
1327 bool aNonBlockingOutput,
1328 uint32_t aSegmentSize,
1329 uint32_t aSegmentCount)
1331 nsresult rv;
1333 nsPipe* pipe = new nsPipe();
1334 if (!pipe) {
1335 return NS_ERROR_OUT_OF_MEMORY;
1338 rv = pipe->Init(aNonBlockingInput,
1339 aNonBlockingOutput,
1340 aSegmentSize,
1341 aSegmentCount);
1342 if (NS_FAILED(rv)) {
1343 NS_ADDREF(pipe);
1344 NS_RELEASE(pipe);
1345 return rv;
1348 pipe->GetInputStream(aPipeIn);
1349 pipe->GetOutputStream(aPipeOut);
1350 return NS_OK;
1353 nsresult
1354 nsPipeConstructor(nsISupports* aOuter, REFNSIID aIID, void** aResult)
1356 if (aOuter) {
1357 return NS_ERROR_NO_AGGREGATION;
1359 nsPipe* pipe = new nsPipe();
1360 if (!pipe) {
1361 return NS_ERROR_OUT_OF_MEMORY;
1363 NS_ADDREF(pipe);
1364 nsresult rv = pipe->QueryInterface(aIID, aResult);
1365 NS_RELEASE(pipe);
1366 return rv;
1369 ////////////////////////////////////////////////////////////////////////////////