1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/Attributes.h"
8 #include "mozilla/ReentrantMonitor.h"
10 #include "nsIEventTarget.h"
11 #include "nsISeekableStream.h"
12 #include "nsIProgrammingLanguage.h"
13 #include "nsSegmentedBuffer.h"
14 #include "nsStreamUtils.h"
18 #include "nsIClassInfoImpl.h"
19 #include "nsAlgorithm.h"
21 #include "nsIAsyncInputStream.h"
22 #include "nsIAsyncOutputStream.h"
24 using namespace mozilla
;
29 #if defined(PR_LOGGING)
31 // set NSPR_LOG_MODULES=nsPipe:5
33 static PRLogModuleInfo
*
36 static PRLogModuleInfo
* sLog
;
38 sLog
= PR_NewLogModule("nsPipe");
42 #define LOG(args) PR_LOG(GetPipeLog(), PR_LOG_DEBUG, args)
47 #define DEFAULT_SEGMENT_SIZE 4096
48 #define DEFAULT_SEGMENT_COUNT 16
52 class nsPipeInputStream
;
53 class nsPipeOutputStream
;
55 //-----------------------------------------------------------------------------
57 // this class is used to delay notifications until the end of a particular
58 // scope. it helps avoid the complexity of issuing callbacks while inside
59 // a critical section.
66 inline void NotifyInputReady(nsIAsyncInputStream
* aStream
,
67 nsIInputStreamCallback
* aCallback
)
69 NS_ASSERTION(!mInputCallback
, "already have an input event");
70 mInputStream
= aStream
;
71 mInputCallback
= aCallback
;
74 inline void NotifyOutputReady(nsIAsyncOutputStream
* aStream
,
75 nsIOutputStreamCallback
* aCallback
)
77 NS_ASSERTION(!mOutputCallback
, "already have an output event");
78 mOutputStream
= aStream
;
79 mOutputCallback
= aCallback
;
83 nsCOMPtr
<nsIAsyncInputStream
> mInputStream
;
84 nsCOMPtr
<nsIInputStreamCallback
> mInputCallback
;
85 nsCOMPtr
<nsIAsyncOutputStream
> mOutputStream
;
86 nsCOMPtr
<nsIOutputStreamCallback
> mOutputCallback
;
89 //-----------------------------------------------------------------------------
91 // the input end of a pipe (allocated as a member of the pipe).
92 class nsPipeInputStream
93 : public nsIAsyncInputStream
94 , public nsISeekableStream
95 , public nsISearchableInputStream
99 // since this class will be allocated as a member of the pipe, we do not
100 // need our own ref count. instead, we share the lifetime (the ref count)
101 // of the entire pipe. this macro is just convenience since it does not
102 // declare a mRefCount variable; however, don't let the name fool you...
103 // we are not inheriting from nsPipe ;-)
104 NS_DECL_ISUPPORTS_INHERITED
106 NS_DECL_NSIINPUTSTREAM
107 NS_DECL_NSIASYNCINPUTSTREAM
108 NS_DECL_NSISEEKABLESTREAM
109 NS_DECL_NSISEARCHABLEINPUTSTREAM
112 explicit nsPipeInputStream(nsPipe
* aPipe
)
123 void SetNonBlocking(bool aNonBlocking
)
125 mBlocking
= !aNonBlocking
;
132 void ReduceAvailable(uint32_t aAvail
)
134 mAvailable
-= aAvail
;
137 // synchronously wait for the pipe to become readable.
140 // these functions return true to indicate that the pipe's monitor should
141 // be notified, to wake up a blocked reader if any.
142 bool OnInputReadable(uint32_t aBytesWritten
, nsPipeEvents
&);
143 bool OnInputException(nsresult
, nsPipeEvents
&);
148 // separate refcnt so that we know when to close the consumer
149 mozilla::ThreadSafeAutoRefCnt mReaderRefCnt
;
150 int64_t mLogicalOffset
;
153 // these variables can only be accessed while inside the pipe's monitor
156 nsCOMPtr
<nsIInputStreamCallback
> mCallback
;
157 uint32_t mCallbackFlags
;
160 //-----------------------------------------------------------------------------
162 // the output end of a pipe (allocated as a member of the pipe).
163 class nsPipeOutputStream
164 : public nsIAsyncOutputStream
165 , public nsIClassInfo
168 // since this class will be allocated as a member of the pipe, we do not
169 // need our own ref count. instead, we share the lifetime (the ref count)
170 // of the entire pipe. this macro is just convenience since it does not
171 // declare a mRefCount variable; however, don't let the name fool you...
172 // we are not inheriting from nsPipe ;-)
173 NS_DECL_ISUPPORTS_INHERITED
175 NS_DECL_NSIOUTPUTSTREAM
176 NS_DECL_NSIASYNCOUTPUTSTREAM
179 explicit nsPipeOutputStream(nsPipe
* aPipe
)
189 void SetNonBlocking(bool aNonBlocking
)
191 mBlocking
= !aNonBlocking
;
193 void SetWritable(bool aWritable
)
195 mWritable
= aWritable
;
198 // synchronously wait for the pipe to become writable.
201 // these functions return true to indicate that the pipe's monitor should
202 // be notified, to wake up a blocked writer if any.
203 bool OnOutputWritable(nsPipeEvents
&);
204 bool OnOutputException(nsresult
, nsPipeEvents
&);
209 // separate refcnt so that we know when to close the producer
210 mozilla::ThreadSafeAutoRefCnt mWriterRefCnt
;
211 int64_t mLogicalOffset
;
214 // these variables can only be accessed while inside the pipe's monitor
217 nsCOMPtr
<nsIOutputStreamCallback
> mCallback
;
218 uint32_t mCallbackFlags
;
221 //-----------------------------------------------------------------------------
223 class nsPipe MOZ_FINAL
: public nsIPipe
226 friend class nsPipeInputStream
;
227 friend class nsPipeOutputStream
;
229 NS_DECL_THREADSAFE_ISUPPORTS
240 // methods below may only be called while inside the pipe's monitor
243 void PeekSegment(uint32_t aIndex
, char*& aCursor
, char*& aLimit
);
246 // methods below may be called while outside the pipe's monitor
249 nsresult
GetReadSegment(const char*& aSegment
, uint32_t& aSegmentLen
);
250 void AdvanceReadCursor(uint32_t aCount
);
252 nsresult
GetWriteSegment(char*& aSegment
, uint32_t& aSegmentLen
);
253 void AdvanceWriteCursor(uint32_t aCount
);
255 void OnPipeException(nsresult aReason
, bool aOutputOnly
= false);
258 // We can't inherit from both nsIInputStream and nsIOutputStream
259 // because they collide on their Close method. Consequently we nest their
260 // implementations to avoid the extra object allocation.
261 nsPipeInputStream mInput
;
262 nsPipeOutputStream mOutput
;
264 ReentrantMonitor mReentrantMonitor
;
265 nsSegmentedBuffer mBuffer
;
270 int32_t mWriteSegment
;
279 // NOTES on buffer architecture:
281 // +-----------------+ - - mBuffer.GetSegment(0)
283 // + - - - - - - - - + - - mReadCursor
284 // |/////////////////|
285 // |/////////////////|
286 // |/////////////////|
287 // |/////////////////|
288 // +-----------------+ - - mReadLimit
290 // +-----------------+
291 // |/////////////////|
292 // |/////////////////|
293 // |/////////////////|
294 // |/////////////////|
295 // |/////////////////|
296 // |/////////////////|
297 // +-----------------+
299 // +-----------------+ - - mBuffer.GetSegment(mWriteSegment)
300 // |/////////////////|
301 // |/////////////////|
302 // |/////////////////|
303 // + - - - - - - - - + - - mWriteCursor
306 // +-----------------+ - - mWriteLimit
308 // (shaded region contains data)
310 // NOTE: on some systems (notably OS/2), the heap allocator uses an arena for
311 // small allocations (e.g., 64 byte allocations). this means that buffers may
312 // be allocated back-to-back. in the diagram above, for example, mReadLimit
313 // would actually be pointing at the beginning of the next segment. when
314 // making changes to this file, please keep this fact in mind.
317 //-----------------------------------------------------------------------------
319 //-----------------------------------------------------------------------------
322 : mInput(MOZ_THIS_IN_INITIALIZER_LIST())
323 , mOutput(MOZ_THIS_IN_INITIALIZER_LIST())
324 , mReentrantMonitor("nsPipe.mReentrantMonitor")
325 , mReadCursor(nullptr)
326 , mReadLimit(nullptr)
328 , mWriteCursor(nullptr)
329 , mWriteLimit(nullptr)
339 NS_IMPL_ISUPPORTS(nsPipe
, nsIPipe
)
342 nsPipe::Init(bool aNonBlockingIn
,
343 bool aNonBlockingOut
,
344 uint32_t aSegmentSize
,
345 uint32_t aSegmentCount
)
349 if (aSegmentSize
== 0) {
350 aSegmentSize
= DEFAULT_SEGMENT_SIZE
;
352 if (aSegmentCount
== 0) {
353 aSegmentCount
= DEFAULT_SEGMENT_COUNT
;
356 // protect against overflow
357 uint32_t maxCount
= uint32_t(-1) / aSegmentSize
;
358 if (aSegmentCount
> maxCount
) {
359 aSegmentCount
= maxCount
;
362 nsresult rv
= mBuffer
.Init(aSegmentSize
, aSegmentSize
* aSegmentCount
);
367 mInput
.SetNonBlocking(aNonBlockingIn
);
368 mOutput
.SetNonBlocking(aNonBlockingOut
);
373 nsPipe::GetInputStream(nsIAsyncInputStream
** aInputStream
)
375 NS_ADDREF(*aInputStream
= &mInput
);
380 nsPipe::GetOutputStream(nsIAsyncOutputStream
** aOutputStream
)
382 if (NS_WARN_IF(!mInited
)) {
383 return NS_ERROR_NOT_INITIALIZED
;
385 NS_ADDREF(*aOutputStream
= &mOutput
);
390 nsPipe::PeekSegment(uint32_t aIndex
, char*& aCursor
, char*& aLimit
)
393 NS_ASSERTION(!mReadCursor
|| mBuffer
.GetSegmentCount(), "unexpected state");
394 aCursor
= mReadCursor
;
397 uint32_t numSegments
= mBuffer
.GetSegmentCount();
398 if (aIndex
>= numSegments
) {
399 aCursor
= aLimit
= nullptr;
401 aCursor
= mBuffer
.GetSegment(aIndex
);
402 if (mWriteSegment
== (int32_t)aIndex
) {
403 aLimit
= mWriteCursor
;
405 aLimit
= aCursor
+ mBuffer
.GetSegmentSize();
412 nsPipe::GetReadSegment(const char*& aSegment
, uint32_t& aSegmentLen
)
414 ReentrantMonitorAutoEnter
mon(mReentrantMonitor
);
416 if (mReadCursor
== mReadLimit
) {
417 return NS_FAILED(mStatus
) ? mStatus
: NS_BASE_STREAM_WOULD_BLOCK
;
420 aSegment
= mReadCursor
;
421 aSegmentLen
= mReadLimit
- mReadCursor
;
426 nsPipe::AdvanceReadCursor(uint32_t aBytesRead
)
428 NS_ASSERTION(aBytesRead
, "don't call if no bytes read");
432 ReentrantMonitorAutoEnter
mon(mReentrantMonitor
);
434 LOG(("III advancing read cursor by %u\n", aBytesRead
));
435 NS_ASSERTION(aBytesRead
<= mBuffer
.GetSegmentSize(), "read too much");
437 mReadCursor
+= aBytesRead
;
438 NS_ASSERTION(mReadCursor
<= mReadLimit
, "read cursor exceeds limit");
440 mInput
.ReduceAvailable(aBytesRead
);
442 if (mReadCursor
== mReadLimit
) {
443 // we've reached the limit of how much we can read from this segment.
444 // if at the end of this segment, then we must discard this segment.
446 // if still writing in this segment then bail because we're not done
447 // with the segment and have to wait for now...
448 if (mWriteSegment
== 0 && mWriteLimit
> mWriteCursor
) {
449 NS_ASSERTION(mReadLimit
== mWriteCursor
, "unexpected state");
453 // shift write segment index (-1 indicates an empty buffer).
456 // done with this segment
457 mBuffer
.DeleteFirstSegment();
458 LOG(("III deleting first segment\n"));
460 if (mWriteSegment
== -1) {
461 // buffer is completely empty
462 mReadCursor
= nullptr;
463 mReadLimit
= nullptr;
464 mWriteCursor
= nullptr;
465 mWriteLimit
= nullptr;
467 // advance read cursor and limit to next buffer segment
468 mReadCursor
= mBuffer
.GetSegment(0);
469 if (mWriteSegment
== 0) {
470 mReadLimit
= mWriteCursor
;
472 mReadLimit
= mReadCursor
+ mBuffer
.GetSegmentSize();
476 // we've free'd up a segment, so notify output stream that pipe has
477 // room for a new segment.
478 if (mOutput
.OnOutputWritable(events
)) {
486 nsPipe::GetWriteSegment(char*& aSegment
, uint32_t& aSegmentLen
)
488 ReentrantMonitorAutoEnter
mon(mReentrantMonitor
);
490 if (NS_FAILED(mStatus
)) {
494 // write cursor and limit may both be null indicating an empty buffer.
495 if (mWriteCursor
== mWriteLimit
) {
496 char* seg
= mBuffer
.AppendNewSegment();
499 return NS_BASE_STREAM_WOULD_BLOCK
;
501 LOG(("OOO appended new segment\n"));
503 mWriteLimit
= mWriteCursor
+ mBuffer
.GetSegmentSize();
507 // make sure read cursor is initialized
509 NS_ASSERTION(mWriteSegment
== 0, "unexpected null read cursor");
510 mReadCursor
= mReadLimit
= mWriteCursor
;
513 // check to see if we can roll-back our read and write cursors to the
514 // beginning of the current/first segment. this is purely an optimization.
515 if (mReadCursor
== mWriteCursor
&& mWriteSegment
== 0) {
516 char* head
= mBuffer
.GetSegment(0);
517 LOG(("OOO rolling back write cursor %u bytes\n", mWriteCursor
- head
));
518 mWriteCursor
= mReadCursor
= mReadLimit
= head
;
521 aSegment
= mWriteCursor
;
522 aSegmentLen
= mWriteLimit
- mWriteCursor
;
527 nsPipe::AdvanceWriteCursor(uint32_t aBytesWritten
)
529 NS_ASSERTION(aBytesWritten
, "don't call if no bytes written");
533 ReentrantMonitorAutoEnter
mon(mReentrantMonitor
);
535 LOG(("OOO advancing write cursor by %u\n", aBytesWritten
));
537 char* newWriteCursor
= mWriteCursor
+ aBytesWritten
;
538 NS_ASSERTION(newWriteCursor
<= mWriteLimit
, "write cursor exceeds limit");
540 // update read limit if reading in the same segment
541 if (mWriteSegment
== 0 && mReadLimit
== mWriteCursor
) {
542 mReadLimit
= newWriteCursor
;
545 mWriteCursor
= newWriteCursor
;
547 // The only way mReadCursor == mWriteCursor is if:
549 // - mReadCursor is at the start of a segment (which, based on how
550 // nsSegmentedBuffer works, means that this segment is the "first"
552 // - mWriteCursor points at the location past the end of the current
553 // write segment (so the current write filled the current write
554 // segment, so we've incremented mWriteCursor to point past the end
556 // - the segment to which data has just been written is located
557 // exactly one segment's worth of bytes before the first segment
558 // where mReadCursor is located
560 // Consequently, the byte immediately after the end of the current
561 // write segment is the first byte of the first segment, so
562 // mReadCursor == mWriteCursor. (Another way to think about this is
563 // to consider the buffer architecture diagram above, but consider it
564 // with an arena allocator which allocates from the *end* of the
565 // arena to the *beginning* of the arena.)
566 NS_ASSERTION(mReadCursor
!= mWriteCursor
||
567 (mBuffer
.GetSegment(0) == mReadCursor
&&
568 mWriteCursor
== mWriteLimit
),
569 "read cursor is bad");
571 // update the writable flag on the output stream
572 if (mWriteCursor
== mWriteLimit
) {
573 if (mBuffer
.GetSize() >= mBuffer
.GetMaxSize()) {
574 mOutput
.SetWritable(false);
578 // notify input stream that pipe now contains additional data
579 if (mInput
.OnInputReadable(aBytesWritten
, events
)) {
586 nsPipe::OnPipeException(nsresult aReason
, bool aOutputOnly
)
588 LOG(("PPP nsPipe::OnPipeException [reason=%x output-only=%d]\n",
589 aReason
, aOutputOnly
));
593 ReentrantMonitorAutoEnter
mon(mReentrantMonitor
);
595 // if we've already hit an exception, then ignore this one.
596 if (NS_FAILED(mStatus
)) {
602 // an output-only exception applies to the input end if the pipe has
603 // zero bytes available.
604 if (aOutputOnly
&& !mInput
.Available()) {
609 if (mInput
.OnInputException(aReason
, events
)) {
613 if (mOutput
.OnOutputException(aReason
, events
)) {
619 //-----------------------------------------------------------------------------
620 // nsPipeEvents methods:
621 //-----------------------------------------------------------------------------
623 nsPipeEvents::~nsPipeEvents()
625 // dispatch any pending events
627 if (mInputCallback
) {
628 mInputCallback
->OnInputStreamReady(mInputStream
);
632 if (mOutputCallback
) {
633 mOutputCallback
->OnOutputStreamReady(mOutputStream
);
639 //-----------------------------------------------------------------------------
640 // nsPipeInputStream methods:
641 //-----------------------------------------------------------------------------
643 NS_IMPL_QUERY_INTERFACE(nsPipeInputStream
,
647 nsISearchableInputStream
,
650 NS_IMPL_CI_INTERFACE_GETTER(nsPipeInputStream
,
654 nsISearchableInputStream
)
656 NS_IMPL_THREADSAFE_CI(nsPipeInputStream
)
659 nsPipeInputStream::Wait()
661 NS_ASSERTION(mBlocking
, "wait on non-blocking pipe input stream");
663 ReentrantMonitorAutoEnter
mon(mPipe
->mReentrantMonitor
);
665 while (NS_SUCCEEDED(mPipe
->mStatus
) && (mAvailable
== 0)) {
666 LOG(("III pipe input: waiting for data\n"));
672 LOG(("III pipe input: woke up [pipe-status=%x available=%u]\n",
673 mPipe
->mStatus
, mAvailable
));
676 return mPipe
->mStatus
== NS_BASE_STREAM_CLOSED
? NS_OK
: mPipe
->mStatus
;
680 nsPipeInputStream::OnInputReadable(uint32_t aBytesWritten
, nsPipeEvents
& aEvents
)
684 mAvailable
+= aBytesWritten
;
686 if (mCallback
&& !(mCallbackFlags
& WAIT_CLOSURE_ONLY
)) {
687 aEvents
.NotifyInputReady(this, mCallback
);
690 } else if (mBlocked
) {
698 nsPipeInputStream::OnInputException(nsresult aReason
, nsPipeEvents
& aEvents
)
700 LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n",
705 NS_ASSERTION(NS_FAILED(aReason
), "huh? successful exception");
707 // force count of available bytes to zero.
711 aEvents
.NotifyInputReady(this, mCallback
);
714 } else if (mBlocked
) {
721 NS_IMETHODIMP_(MozExternalRefCountType
)
722 nsPipeInputStream::AddRef(void)
725 return mPipe
->AddRef();
728 NS_IMETHODIMP_(MozExternalRefCountType
)
729 nsPipeInputStream::Release(void)
731 if (--mReaderRefCnt
== 0) {
734 return mPipe
->Release();
738 nsPipeInputStream::CloseWithStatus(nsresult aReason
)
740 LOG(("III CloseWithStatus [this=%x reason=%x]\n", this, aReason
));
742 if (NS_SUCCEEDED(aReason
)) {
743 aReason
= NS_BASE_STREAM_CLOSED
;
746 mPipe
->OnPipeException(aReason
);
751 nsPipeInputStream::Close()
753 return CloseWithStatus(NS_BASE_STREAM_CLOSED
);
757 nsPipeInputStream::Available(uint64_t* aResult
)
759 // nsPipeInputStream supports under 4GB stream only
760 ReentrantMonitorAutoEnter
mon(mPipe
->mReentrantMonitor
);
762 // return error if pipe closed
763 if (!mAvailable
&& NS_FAILED(mPipe
->mStatus
)) {
764 return mPipe
->mStatus
;
767 *aResult
= (uint64_t)mAvailable
;
772 nsPipeInputStream::ReadSegments(nsWriteSegmentFun aWriter
,
775 uint32_t* aReadCount
)
777 LOG(("III ReadSegments [this=%x count=%u]\n", this, aCount
));
786 rv
= mPipe
->GetReadSegment(segment
, segmentLen
);
788 // ignore this error if we've already read something.
789 if (*aReadCount
> 0) {
793 if (rv
== NS_BASE_STREAM_WOULD_BLOCK
) {
798 // wait for some data to be written to the pipe
800 if (NS_SUCCEEDED(rv
)) {
804 // ignore this error, just return.
805 if (rv
== NS_BASE_STREAM_CLOSED
) {
809 mPipe
->OnPipeException(rv
);
813 // read no more than aCount
814 if (segmentLen
> aCount
) {
818 uint32_t writeCount
, originalLen
= segmentLen
;
822 rv
= aWriter(this, aClosure
, segment
, *aReadCount
, segmentLen
, &writeCount
);
824 if (NS_FAILED(rv
) || writeCount
== 0) {
826 // any errors returned from the writer end here: do not
827 // propagate to the caller of ReadSegments.
832 NS_ASSERTION(writeCount
<= segmentLen
, "wrote more than expected");
833 segment
+= writeCount
;
834 segmentLen
-= writeCount
;
835 aCount
-= writeCount
;
836 *aReadCount
+= writeCount
;
837 mLogicalOffset
+= writeCount
;
840 if (segmentLen
< originalLen
) {
841 mPipe
->AdvanceReadCursor(originalLen
- segmentLen
);
849 nsPipeInputStream::Read(char* aToBuf
, uint32_t aBufLen
, uint32_t* aReadCount
)
851 return ReadSegments(NS_CopySegmentToBuffer
, aToBuf
, aBufLen
, aReadCount
);
855 nsPipeInputStream::IsNonBlocking(bool* aNonBlocking
)
857 *aNonBlocking
= !mBlocking
;
862 nsPipeInputStream::AsyncWait(nsIInputStreamCallback
* aCallback
,
864 uint32_t aRequestedCount
,
865 nsIEventTarget
* aTarget
)
867 LOG(("III AsyncWait [this=%x]\n", this));
869 nsPipeEvents pipeEvents
;
871 ReentrantMonitorAutoEnter
mon(mPipe
->mReentrantMonitor
);
873 // replace a pending callback
881 nsCOMPtr
<nsIInputStreamCallback
> proxy
;
883 proxy
= NS_NewInputStreamReadyEvent(aCallback
, aTarget
);
887 if (NS_FAILED(mPipe
->mStatus
) ||
888 (mAvailable
&& !(aFlags
& WAIT_CLOSURE_ONLY
))) {
889 // stream is already closed or readable; post event.
890 pipeEvents
.NotifyInputReady(this, aCallback
);
892 // queue up callback object to be notified when data becomes available
893 mCallback
= aCallback
;
894 mCallbackFlags
= aFlags
;
901 nsPipeInputStream::Seek(int32_t aWhence
, int64_t aOffset
)
903 NS_NOTREACHED("nsPipeInputStream::Seek");
904 return NS_ERROR_NOT_IMPLEMENTED
;
908 nsPipeInputStream::Tell(int64_t* aOffset
)
910 ReentrantMonitorAutoEnter
mon(mPipe
->mReentrantMonitor
);
912 // return error if pipe closed
913 if (!mAvailable
&& NS_FAILED(mPipe
->mStatus
)) {
914 return mPipe
->mStatus
;
917 *aOffset
= mLogicalOffset
;
922 nsPipeInputStream::SetEOF()
924 NS_NOTREACHED("nsPipeInputStream::SetEOF");
925 return NS_ERROR_NOT_IMPLEMENTED
;
928 #define COMPARE(s1, s2, i) \
930 ? nsCRT::strncasecmp((const char *)s1, (const char *)s2, (uint32_t)i) \
931 : nsCRT::strncmp((const char *)s1, (const char *)s2, (uint32_t)i))
934 nsPipeInputStream::Search(const char* aForString
,
937 uint32_t* aOffsetSearchedTo
)
939 LOG(("III Search [for=%s ic=%u]\n", aForString
, aIgnoreCase
));
941 ReentrantMonitorAutoEnter
mon(mPipe
->mReentrantMonitor
);
945 uint32_t index
= 0, offset
= 0;
946 uint32_t strLen
= strlen(aForString
);
948 mPipe
->PeekSegment(0, cursor1
, limit1
);
949 if (cursor1
== limit1
) {
951 *aOffsetSearchedTo
= 0;
952 LOG((" result [aFound=%u offset=%u]\n", *aFound
, *aOffsetSearchedTo
));
957 uint32_t i
, len1
= limit1
- cursor1
;
959 // check if the string is in the buffer segment
960 for (i
= 0; i
< len1
- strLen
+ 1; i
++) {
961 if (COMPARE(&cursor1
[i
], aForString
, strLen
) == 0) {
963 *aOffsetSearchedTo
= offset
+ i
;
964 LOG((" result [aFound=%u offset=%u]\n", *aFound
, *aOffsetSearchedTo
));
969 // get the next segment
977 mPipe
->PeekSegment(index
, cursor2
, limit2
);
978 if (cursor2
== limit2
) {
980 *aOffsetSearchedTo
= offset
- strLen
+ 1;
981 LOG((" result [aFound=%u offset=%u]\n", *aFound
, *aOffsetSearchedTo
));
984 len2
= limit2
- cursor2
;
986 // check if the string is straddling the next buffer segment
987 uint32_t lim
= XPCOM_MIN(strLen
, len2
+ 1);
988 for (i
= 0; i
< lim
; ++i
) {
989 uint32_t strPart1Len
= strLen
- i
- 1;
990 uint32_t strPart2Len
= strLen
- strPart1Len
;
991 const char* strPart2
= &aForString
[strLen
- strPart2Len
];
992 uint32_t bufSeg1Offset
= len1
- strPart1Len
;
993 if (COMPARE(&cursor1
[bufSeg1Offset
], aForString
, strPart1Len
) == 0 &&
994 COMPARE(cursor2
, strPart2
, strPart2Len
) == 0) {
996 *aOffsetSearchedTo
= offset
- strPart1Len
;
997 LOG((" result [aFound=%u offset=%u]\n", *aFound
, *aOffsetSearchedTo
));
1002 // finally continue with the next buffer
1007 NS_NOTREACHED("can't get here");
1008 return NS_ERROR_UNEXPECTED
; // keep compiler happy
1011 //-----------------------------------------------------------------------------
1012 // nsPipeOutputStream methods:
1013 //-----------------------------------------------------------------------------
1015 NS_IMPL_QUERY_INTERFACE(nsPipeOutputStream
,
1017 nsIAsyncOutputStream
,
1020 NS_IMPL_CI_INTERFACE_GETTER(nsPipeOutputStream
,
1022 nsIAsyncOutputStream
)
1024 NS_IMPL_THREADSAFE_CI(nsPipeOutputStream
)
1027 nsPipeOutputStream::Wait()
1029 NS_ASSERTION(mBlocking
, "wait on non-blocking pipe output stream");
1031 ReentrantMonitorAutoEnter
mon(mPipe
->mReentrantMonitor
);
1033 if (NS_SUCCEEDED(mPipe
->mStatus
) && !mWritable
) {
1034 LOG(("OOO pipe output: waiting for space\n"));
1038 LOG(("OOO pipe output: woke up [pipe-status=%x writable=%u]\n",
1039 mPipe
->mStatus
, mWritable
));
1042 return mPipe
->mStatus
== NS_BASE_STREAM_CLOSED
? NS_OK
: mPipe
->mStatus
;
1046 nsPipeOutputStream::OnOutputWritable(nsPipeEvents
& aEvents
)
1048 bool result
= false;
1052 if (mCallback
&& !(mCallbackFlags
& WAIT_CLOSURE_ONLY
)) {
1053 aEvents
.NotifyOutputReady(this, mCallback
);
1056 } else if (mBlocked
) {
1064 nsPipeOutputStream::OnOutputException(nsresult aReason
, nsPipeEvents
& aEvents
)
1066 LOG(("nsPipeOutputStream::OnOutputException [this=%x reason=%x]\n",
1069 bool result
= false;
1071 NS_ASSERTION(NS_FAILED(aReason
), "huh? successful exception");
1075 aEvents
.NotifyOutputReady(this, mCallback
);
1078 } else if (mBlocked
) {
1086 NS_IMETHODIMP_(MozExternalRefCountType
)
1087 nsPipeOutputStream::AddRef()
1090 return mPipe
->AddRef();
1093 NS_IMETHODIMP_(MozExternalRefCountType
)
1094 nsPipeOutputStream::Release()
1096 if (--mWriterRefCnt
== 0) {
1099 return mPipe
->Release();
1103 nsPipeOutputStream::CloseWithStatus(nsresult aReason
)
1105 LOG(("OOO CloseWithStatus [this=%x reason=%x]\n", this, aReason
));
1107 if (NS_SUCCEEDED(aReason
)) {
1108 aReason
= NS_BASE_STREAM_CLOSED
;
1111 // input stream may remain open
1112 mPipe
->OnPipeException(aReason
, true);
1117 nsPipeOutputStream::Close()
1119 return CloseWithStatus(NS_BASE_STREAM_CLOSED
);
1123 nsPipeOutputStream::WriteSegments(nsReadSegmentFun aReader
,
1126 uint32_t* aWriteCount
)
1128 LOG(("OOO WriteSegments [this=%x count=%u]\n", this, aCount
));
1130 nsresult rv
= NS_OK
;
1133 uint32_t segmentLen
;
1137 rv
= mPipe
->GetWriteSegment(segment
, segmentLen
);
1138 if (NS_FAILED(rv
)) {
1139 if (rv
== NS_BASE_STREAM_WOULD_BLOCK
) {
1142 // ignore this error if we've already written something
1143 if (*aWriteCount
> 0) {
1148 // wait for the pipe to have an empty segment.
1150 if (NS_SUCCEEDED(rv
)) {
1154 mPipe
->OnPipeException(rv
);
1158 // write no more than aCount
1159 if (segmentLen
> aCount
) {
1160 segmentLen
= aCount
;
1163 uint32_t readCount
, originalLen
= segmentLen
;
1164 while (segmentLen
) {
1167 rv
= aReader(this, aClosure
, segment
, *aWriteCount
, segmentLen
, &readCount
);
1169 if (NS_FAILED(rv
) || readCount
== 0) {
1171 // any errors returned from the aReader end here: do not
1172 // propagate to the caller of WriteSegments.
1177 NS_ASSERTION(readCount
<= segmentLen
, "read more than expected");
1178 segment
+= readCount
;
1179 segmentLen
-= readCount
;
1180 aCount
-= readCount
;
1181 *aWriteCount
+= readCount
;
1182 mLogicalOffset
+= readCount
;
1185 if (segmentLen
< originalLen
) {
1186 mPipe
->AdvanceWriteCursor(originalLen
- segmentLen
);
1194 nsReadFromRawBuffer(nsIOutputStream
* aOutStr
,
1196 char* aToRawSegment
,
1199 uint32_t* aReadCount
)
1201 const char* fromBuf
= (const char*)aClosure
;
1202 memcpy(aToRawSegment
, &fromBuf
[aOffset
], aCount
);
1203 *aReadCount
= aCount
;
1208 nsPipeOutputStream::Write(const char* aFromBuf
,
1210 uint32_t* aWriteCount
)
1212 return WriteSegments(nsReadFromRawBuffer
, (void*)aFromBuf
, aBufLen
, aWriteCount
);
1216 nsPipeOutputStream::Flush(void)
1223 nsReadFromInputStream(nsIOutputStream
* aOutStr
,
1225 char* aToRawSegment
,
1228 uint32_t* aReadCount
)
1230 nsIInputStream
* fromStream
= (nsIInputStream
*)aClosure
;
1231 return fromStream
->Read(aToRawSegment
, aCount
, aReadCount
);
1235 nsPipeOutputStream::WriteFrom(nsIInputStream
* aFromStream
,
1237 uint32_t* aWriteCount
)
1239 return WriteSegments(nsReadFromInputStream
, aFromStream
, aCount
, aWriteCount
);
1243 nsPipeOutputStream::IsNonBlocking(bool* aNonBlocking
)
1245 *aNonBlocking
= !mBlocking
;
1250 nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback
* aCallback
,
1252 uint32_t aRequestedCount
,
1253 nsIEventTarget
* aTarget
)
1255 LOG(("OOO AsyncWait [this=%x]\n", this));
1257 nsPipeEvents pipeEvents
;
1259 ReentrantMonitorAutoEnter
mon(mPipe
->mReentrantMonitor
);
1261 // replace a pending callback
1269 nsCOMPtr
<nsIOutputStreamCallback
> proxy
;
1271 proxy
= NS_NewOutputStreamReadyEvent(aCallback
, aTarget
);
1275 if (NS_FAILED(mPipe
->mStatus
) ||
1276 (mWritable
&& !(aFlags
& WAIT_CLOSURE_ONLY
))) {
1277 // stream is already closed or writable; post event.
1278 pipeEvents
.NotifyOutputReady(this, aCallback
);
1280 // queue up callback object to be notified when data becomes available
1281 mCallback
= aCallback
;
1282 mCallbackFlags
= aFlags
;
1288 ////////////////////////////////////////////////////////////////////////////////
1291 NS_NewPipe(nsIInputStream
** aPipeIn
,
1292 nsIOutputStream
** aPipeOut
,
1293 uint32_t aSegmentSize
,
1295 bool aNonBlockingInput
,
1296 bool aNonBlockingOutput
)
1298 if (aSegmentSize
== 0) {
1299 aSegmentSize
= DEFAULT_SEGMENT_SIZE
;
1302 // Handle aMaxSize of UINT32_MAX as a special case
1303 uint32_t segmentCount
;
1304 if (aMaxSize
== UINT32_MAX
) {
1305 segmentCount
= UINT32_MAX
;
1307 segmentCount
= aMaxSize
/ aSegmentSize
;
1310 nsIAsyncInputStream
* in
;
1311 nsIAsyncOutputStream
* out
;
1312 nsresult rv
= NS_NewPipe2(&in
, &out
, aNonBlockingInput
, aNonBlockingOutput
,
1313 aSegmentSize
, segmentCount
);
1314 if (NS_FAILED(rv
)) {
1324 NS_NewPipe2(nsIAsyncInputStream
** aPipeIn
,
1325 nsIAsyncOutputStream
** aPipeOut
,
1326 bool aNonBlockingInput
,
1327 bool aNonBlockingOutput
,
1328 uint32_t aSegmentSize
,
1329 uint32_t aSegmentCount
)
1333 nsPipe
* pipe
= new nsPipe();
1335 return NS_ERROR_OUT_OF_MEMORY
;
1338 rv
= pipe
->Init(aNonBlockingInput
,
1342 if (NS_FAILED(rv
)) {
1348 pipe
->GetInputStream(aPipeIn
);
1349 pipe
->GetOutputStream(aPipeOut
);
1354 nsPipeConstructor(nsISupports
* aOuter
, REFNSIID aIID
, void** aResult
)
1357 return NS_ERROR_NO_AGGREGATION
;
1359 nsPipe
* pipe
= new nsPipe();
1361 return NS_ERROR_OUT_OF_MEMORY
;
1364 nsresult rv
= pipe
->QueryInterface(aIID
, aResult
);
1369 ////////////////////////////////////////////////////////////////////////////////