1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
8 #include "mozilla/Attributes.h"
9 #include "mozilla/IntegerPrintfMacros.h"
10 #include "mozilla/ReentrantMonitor.h"
11 #include "nsIBufferedStreams.h"
12 #include "nsICloneableInputStream.h"
14 #include "nsIEventTarget.h"
15 #include "nsITellableStream.h"
16 #include "mozilla/RefPtr.h"
17 #include "nsSegmentedBuffer.h"
18 #include "nsStreamUtils.h"
22 #include "mozilla/Logging.h"
23 #include "nsIClassInfoImpl.h"
24 #include "nsAlgorithm.h"
26 #include "nsIAsyncInputStream.h"
27 #include "nsIAsyncOutputStream.h"
28 #include "nsIInputStreamPriority.h"
29 #include "nsThreadUtils.h"
31 using namespace mozilla
;
37 // set MOZ_LOG=nsPipe:5
39 static LazyLogModule
sPipeLog("nsPipe");
40 #define LOG(args) MOZ_LOG(sPipeLog, mozilla::LogLevel::Debug, args)
42 #define DEFAULT_SEGMENT_SIZE 4096
43 #define DEFAULT_SEGMENT_COUNT 16
47 class nsPipeInputStream
;
48 class nsPipeOutputStream
;
49 class AutoReadSegment
;
53 enum MonitorAction
{ DoNotNotifyMonitor
, NotifyMonitor
};
55 enum SegmentChangeResult
{ SegmentNotChanged
, SegmentAdvanceBufferRead
};
59 //-----------------------------------------------------------------------------
61 class CallbackHolder
{
63 CallbackHolder() = default;
64 MOZ_IMPLICIT
CallbackHolder(std::nullptr_t
) {}
66 CallbackHolder(nsIAsyncInputStream
* aStream
,
67 nsIInputStreamCallback
* aCallback
, uint32_t aFlags
,
68 nsIEventTarget
* aEventTarget
)
69 : mRunnable(aCallback
? NS_NewCancelableRunnableFunction(
70 "nsPipeInputStream AsyncWait Callback",
71 [stream
= nsCOMPtr
{aStream
},
72 callback
= nsCOMPtr
{aCallback
}]() {
73 callback
->OnInputStreamReady(stream
);
76 mEventTarget(aEventTarget
),
79 CallbackHolder(nsIAsyncOutputStream
* aStream
,
80 nsIOutputStreamCallback
* aCallback
, uint32_t aFlags
,
81 nsIEventTarget
* aEventTarget
)
82 : mRunnable(aCallback
? NS_NewCancelableRunnableFunction(
83 "nsPipeOutputStream AsyncWait Callback",
84 [stream
= nsCOMPtr
{aStream
},
85 callback
= nsCOMPtr
{aCallback
}]() {
86 callback
->OnOutputStreamReady(stream
);
89 mEventTarget(aEventTarget
),
92 CallbackHolder(const CallbackHolder
&) = delete;
93 CallbackHolder(CallbackHolder
&&) = default;
94 CallbackHolder
& operator=(const CallbackHolder
&) = delete;
95 CallbackHolder
& operator=(CallbackHolder
&&) = default;
97 CallbackHolder
& operator=(std::nullptr_t
) {
99 mEventTarget
= nullptr;
104 MOZ_IMPLICIT
operator bool() const { return mRunnable
; }
106 uint32_t Flags() const {
107 MOZ_ASSERT(mRunnable
, "Should only be called when a callback is present");
112 nsCOMPtr
<nsIRunnable
> runnable
= mRunnable
.forget();
113 nsCOMPtr
<nsIEventTarget
> eventTarget
= mEventTarget
.forget();
116 eventTarget
->Dispatch(runnable
.forget());
124 nsCOMPtr
<nsIRunnable
> mRunnable
;
125 nsCOMPtr
<nsIEventTarget
> mEventTarget
;
129 //-----------------------------------------------------------------------------
131 // this class is used to delay notifications until the end of a particular
132 // scope. it helps avoid the complexity of issuing callbacks while inside
133 // a critical section.
136 nsPipeEvents() = default;
139 inline void NotifyReady(CallbackHolder aCallback
) {
140 mCallbacks
.AppendElement(std::move(aCallback
));
144 nsTArray
<CallbackHolder
> mCallbacks
;
147 //-----------------------------------------------------------------------------
149 // This class is used to maintain input stream state. Its broken out from the
150 // nsPipeInputStream class because generally the nsPipe should be modifying
151 // this state and not the input stream itself.
152 struct nsPipeReadState
{
154 : mReadCursor(nullptr),
161 // All members of this type are guarded by the pipe monitor, however it cannot
162 // be named from this type, so the less-reliable MOZ_GUARDED_VAR is used
163 // instead. In the future it would be nice to avoid this, especially as
164 // MOZ_GUARDED_VAR is deprecated.
165 char* mReadCursor MOZ_GUARDED_VAR
;
166 char* mReadLimit MOZ_GUARDED_VAR
;
167 int32_t mSegment MOZ_GUARDED_VAR
;
168 uint32_t mAvailable MOZ_GUARDED_VAR
;
170 // This flag is managed using the AutoReadSegment RAII stack class.
171 bool mActiveRead MOZ_GUARDED_VAR
;
173 // Set to indicate that the input stream has closed and should be drained,
174 // but that drain has been delayed due to an active read. When the read
175 // completes, this flag indicate the drain should then be performed.
176 bool mNeedDrain MOZ_GUARDED_VAR
;
179 //-----------------------------------------------------------------------------
181 // an input end of a pipe (maintained as a list of refs within the pipe)
182 class nsPipeInputStream final
: public nsIAsyncInputStream
,
183 public nsITellableStream
,
184 public nsISearchableInputStream
,
185 public nsICloneableInputStream
,
187 public nsIBufferedInputStream
,
188 public nsIInputStreamPriority
{
190 NS_DECL_THREADSAFE_ISUPPORTS
191 NS_DECL_NSIINPUTSTREAM
192 NS_DECL_NSIASYNCINPUTSTREAM
193 NS_DECL_NSITELLABLESTREAM
194 NS_DECL_NSISEARCHABLEINPUTSTREAM
195 NS_DECL_NSICLONEABLEINPUTSTREAM
197 NS_DECL_NSIBUFFEREDINPUTSTREAM
198 NS_DECL_NSIINPUTSTREAMPRIORITY
200 explicit nsPipeInputStream(nsPipe
* aPipe
)
206 mPriority(nsIRunnablePriority::PRIORITY_NORMAL
) {}
208 nsPipeInputStream(const nsPipeInputStream
& aOther
)
209 : mPipe(aOther
.mPipe
),
210 mLogicalOffset(aOther
.mLogicalOffset
),
211 mInputStatus(aOther
.mInputStatus
),
212 mBlocking(aOther
.mBlocking
),
214 mReadState(aOther
.mReadState
),
215 mPriority(nsIRunnablePriority::PRIORITY_NORMAL
) {}
217 void SetNonBlocking(bool aNonBlocking
) { mBlocking
= !aNonBlocking
; }
219 uint32_t Available() MOZ_REQUIRES(Monitor());
221 // synchronously wait for the pipe to become readable.
224 // These two don't acquire the monitor themselves. Instead they
225 // expect their caller to have done so and to pass the monitor as
227 MonitorAction
OnInputReadable(uint32_t aBytesWritten
, nsPipeEvents
&,
228 const ReentrantMonitorAutoEnter
& ev
)
229 MOZ_REQUIRES(Monitor());
230 MonitorAction
OnInputException(nsresult
, nsPipeEvents
&,
231 const ReentrantMonitorAutoEnter
& ev
)
232 MOZ_REQUIRES(Monitor());
234 nsPipeReadState
& ReadState() { return mReadState
; }
236 const nsPipeReadState
& ReadState() const { return mReadState
; }
238 nsresult
Status() const;
240 // A version of Status() that doesn't acquire the monitor.
241 nsresult
Status(const ReentrantMonitorAutoEnter
& ev
) const
242 MOZ_REQUIRES(Monitor());
244 // The status of this input stream, ignoring the status of the underlying
245 // monitor. If this status is errored, the input stream has either already
246 // been removed from the pipe, or will be removed from the pipe shortly.
247 nsresult
InputStatus(const ReentrantMonitorAutoEnter
&) const
248 MOZ_REQUIRES(Monitor()) {
252 ReentrantMonitor
& Monitor() const;
255 virtual ~nsPipeInputStream();
257 RefPtr
<nsPipe
> mPipe
;
259 int64_t mLogicalOffset
;
260 // Individual input streams can be closed without effecting the rest of the
261 // pipe. So track individual input stream status separately. |mInputStatus|
262 // is protected by |mPipe->mReentrantMonitor|.
263 nsresult mInputStatus
MOZ_GUARDED_BY(Monitor());
266 // these variables can only be accessed while inside the pipe's monitor
267 bool mBlocked
MOZ_GUARDED_BY(Monitor());
268 CallbackHolder mCallback
MOZ_GUARDED_BY(Monitor());
270 // requires pipe's monitor to access members; usually treat as an opaque token
272 nsPipeReadState mReadState
;
273 Atomic
<uint32_t, Relaxed
> mPriority
;
276 //-----------------------------------------------------------------------------
278 // the output end of a pipe (allocated as a member of the pipe).
279 class nsPipeOutputStream
: public nsIAsyncOutputStream
, public nsIClassInfo
{
281 // since this class will be allocated as a member of the pipe, we do not
282 // need our own ref count. instead, we share the lifetime (the ref count)
283 // of the entire pipe. this macro is just convenience since it does not
284 // declare a mRefCount variable; however, don't let the name fool you...
285 // we are not inheriting from nsPipe ;-)
286 NS_DECL_ISUPPORTS_INHERITED
288 NS_DECL_NSIOUTPUTSTREAM
289 NS_DECL_NSIASYNCOUTPUTSTREAM
292 explicit nsPipeOutputStream(nsPipe
* aPipe
)
300 void SetNonBlocking(bool aNonBlocking
) { mBlocking
= !aNonBlocking
; }
301 void SetWritable(bool aWritable
) MOZ_REQUIRES(Monitor()) {
302 mWritable
= aWritable
;
305 // synchronously wait for the pipe to become writable.
308 MonitorAction
OnOutputWritable(nsPipeEvents
&) MOZ_REQUIRES(Monitor());
309 MonitorAction
OnOutputException(nsresult
, nsPipeEvents
&)
310 MOZ_REQUIRES(Monitor());
312 ReentrantMonitor
& Monitor() const;
317 // separate refcnt so that we know when to close the producer
318 ThreadSafeAutoRefCnt mWriterRefCnt
;
319 int64_t mLogicalOffset
;
322 // these variables can only be accessed while inside the pipe's monitor
323 bool mBlocked
MOZ_GUARDED_BY(Monitor());
324 bool mWritable
MOZ_GUARDED_BY(Monitor());
325 CallbackHolder mCallback
MOZ_GUARDED_BY(Monitor());
328 //-----------------------------------------------------------------------------
332 friend class nsPipeInputStream
;
333 friend class nsPipeOutputStream
;
334 friend class AutoReadSegment
;
336 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(nsPipe
)
338 // public constructor
339 friend void NS_NewPipe2(nsIAsyncInputStream
**, nsIAsyncOutputStream
**, bool,
340 bool, uint32_t, uint32_t);
343 nsPipe(uint32_t aSegmentSize
, uint32_t aSegmentCount
);
347 // Methods below may only be called while inside the pipe's monitor. Some
348 // of these methods require passing a ReentrantMonitorAutoEnter to prove the
352 void PeekSegment(const nsPipeReadState
& aReadState
, uint32_t aIndex
,
353 char*& aCursor
, char*& aLimit
)
354 MOZ_REQUIRES(mReentrantMonitor
);
355 SegmentChangeResult
AdvanceReadSegment(nsPipeReadState
& aReadState
,
356 const ReentrantMonitorAutoEnter
& ev
)
357 MOZ_REQUIRES(mReentrantMonitor
);
358 bool ReadSegmentBeingWritten(nsPipeReadState
& aReadState
)
359 MOZ_REQUIRES(mReentrantMonitor
);
360 uint32_t CountSegmentReferences(int32_t aSegment
)
361 MOZ_REQUIRES(mReentrantMonitor
);
362 void SetAllNullReadCursors() MOZ_REQUIRES(mReentrantMonitor
);
363 bool AllReadCursorsMatchWriteCursor() MOZ_REQUIRES(mReentrantMonitor
);
364 void RollBackAllReadCursors(char* aWriteCursor
)
365 MOZ_REQUIRES(mReentrantMonitor
);
366 void UpdateAllReadCursors(char* aWriteCursor
) MOZ_REQUIRES(mReentrantMonitor
);
367 void ValidateAllReadCursors() MOZ_REQUIRES(mReentrantMonitor
);
368 uint32_t GetBufferSegmentCount(const nsPipeReadState
& aReadState
,
369 const ReentrantMonitorAutoEnter
& ev
) const
370 MOZ_REQUIRES(mReentrantMonitor
);
371 bool IsAdvanceBufferFull(const ReentrantMonitorAutoEnter
& ev
) const
372 MOZ_REQUIRES(mReentrantMonitor
);
375 // methods below may be called while outside the pipe's monitor
378 void DrainInputStream(nsPipeReadState
& aReadState
, nsPipeEvents
& aEvents
);
379 nsresult
GetWriteSegment(char*& aSegment
, uint32_t& aSegmentLen
);
380 void AdvanceWriteCursor(uint32_t aCount
);
382 void OnInputStreamException(nsPipeInputStream
* aStream
, nsresult aReason
);
383 void OnPipeException(nsresult aReason
, bool aOutputOnly
= false);
385 nsresult
CloneInputStream(nsPipeInputStream
* aOriginal
,
386 nsIInputStream
** aCloneOut
);
388 // methods below should only be called by AutoReadSegment
389 nsresult
GetReadSegment(nsPipeReadState
& aReadState
, const char*& aSegment
,
391 void ReleaseReadSegment(nsPipeReadState
& aReadState
, nsPipeEvents
& aEvents
);
392 void AdvanceReadCursor(nsPipeReadState
& aReadState
, uint32_t aCount
);
394 // We can't inherit from both nsIInputStream and nsIOutputStream
395 // because they collide on their Close method. Consequently we nest their
396 // implementations to avoid the extra object allocation.
397 nsPipeOutputStream mOutput
;
399 // Since the input stream can be cloned, we may have more than one. Use
400 // a weak reference as the streams will clear their entry here in their
401 // destructor. Using a strong reference would create a reference cycle.
402 // Only usable while mReentrantMonitor is locked.
403 nsTArray
<nsPipeInputStream
*> mInputList
MOZ_GUARDED_BY(mReentrantMonitor
);
405 ReentrantMonitor mReentrantMonitor
;
406 nsSegmentedBuffer mBuffer
MOZ_GUARDED_BY(mReentrantMonitor
);
408 // The maximum number of segments to allow to be buffered in advance
409 // of the fastest reader. This is collection of segments is called
410 // the "advance buffer".
411 uint32_t mMaxAdvanceBufferSegmentCount
MOZ_GUARDED_BY(mReentrantMonitor
);
413 int32_t mWriteSegment
MOZ_GUARDED_BY(mReentrantMonitor
);
414 char* mWriteCursor
MOZ_GUARDED_BY(mReentrantMonitor
);
415 char* mWriteLimit
MOZ_GUARDED_BY(mReentrantMonitor
);
417 // |mStatus| is protected by |mReentrantMonitor|.
418 nsresult mStatus
MOZ_GUARDED_BY(mReentrantMonitor
);
421 //-----------------------------------------------------------------------------
423 // Declarations of Monitor() methods on the streams.
425 // These must be placed early to provide MOZ_RETURN_CAPABILITY annotations for
426 // the thread-safety analysis. This couldn't be done at the declaration due to
427 // nsPipe not yet being defined.
429 ReentrantMonitor
& nsPipeOutputStream::Monitor() const
430 MOZ_RETURN_CAPABILITY(mPipe
->mReentrantMonitor
) {
431 return mPipe
->mReentrantMonitor
;
434 ReentrantMonitor
& nsPipeInputStream::Monitor() const
435 MOZ_RETURN_CAPABILITY(mPipe
->mReentrantMonitor
) {
436 return mPipe
->mReentrantMonitor
;
439 //-----------------------------------------------------------------------------
441 // RAII class representing an active read segment. When it goes out of scope
442 // it automatically updates the read cursor and releases the read segment.
443 class MOZ_STACK_CLASS AutoReadSegment final
{
445 AutoReadSegment(nsPipe
* aPipe
, nsPipeReadState
& aReadState
,
448 mReadState(aReadState
),
449 mStatus(NS_ERROR_FAILURE
),
453 MOZ_DIAGNOSTIC_ASSERT(mPipe
);
454 MOZ_DIAGNOSTIC_ASSERT(!mReadState
.mActiveRead
);
455 mStatus
= mPipe
->GetReadSegment(mReadState
, mSegment
, mLength
);
456 if (NS_SUCCEEDED(mStatus
)) {
457 MOZ_DIAGNOSTIC_ASSERT(mReadState
.mActiveRead
);
458 MOZ_DIAGNOSTIC_ASSERT(mSegment
);
459 mLength
= std::min(mLength
, aMaxLength
);
460 MOZ_DIAGNOSTIC_ASSERT(mLength
);
465 if (NS_SUCCEEDED(mStatus
)) {
467 mPipe
->AdvanceReadCursor(mReadState
, mOffset
);
470 mPipe
->ReleaseReadSegment(mReadState
, events
);
473 MOZ_DIAGNOSTIC_ASSERT(!mReadState
.mActiveRead
);
476 nsresult
Status() const { return mStatus
; }
478 const char* Data() const {
479 MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus
));
480 MOZ_DIAGNOSTIC_ASSERT(mSegment
);
481 return mSegment
+ mOffset
;
484 uint32_t Length() const {
485 MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus
));
486 MOZ_DIAGNOSTIC_ASSERT(mLength
>= mOffset
);
487 return mLength
- mOffset
;
490 void Advance(uint32_t aCount
) {
491 MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus
));
492 MOZ_DIAGNOSTIC_ASSERT(aCount
<= (mLength
- mOffset
));
496 nsPipeReadState
& ReadState() const { return mReadState
; }
499 // guaranteed to remain alive due to limited stack lifetime of AutoReadSegment
501 nsPipeReadState
& mReadState
;
503 const char* mSegment
;
509 // NOTES on buffer architecture:
511 // +-----------------+ - - mBuffer.GetSegment(0)
513 // + - - - - - - - - + - - nsPipeReadState.mReadCursor
514 // |/////////////////|
515 // |/////////////////|
516 // |/////////////////|
517 // |/////////////////|
518 // +-----------------+ - - nsPipeReadState.mReadLimit
520 // +-----------------+
521 // |/////////////////|
522 // |/////////////////|
523 // |/////////////////|
524 // |/////////////////|
525 // |/////////////////|
526 // |/////////////////|
527 // +-----------------+
529 // +-----------------+ - - mBuffer.GetSegment(mWriteSegment)
530 // |/////////////////|
531 // |/////////////////|
532 // |/////////////////|
533 // + - - - - - - - - + - - mWriteCursor
536 // +-----------------+ - - mWriteLimit
538 // (shaded region contains data)
540 // NOTE: Each input stream produced by the nsPipe contains its own, separate
541 // nsPipeReadState. This means there are multiple mReadCursor and
542 // mReadLimit values in play. The pipe cannot discard old data until
543 // all mReadCursors have moved beyond that point in the stream.
545 // Likewise, each input stream reader will have it's own amount of
546 // buffered data. The pipe size threshold, however, is only applied
547 // to the input stream that is being read fastest. We call this
548 // the "advance buffer" in that its in advance of all readers. We
549 // allow slower input streams to buffer more data so that we don't
550 // stall processing of the faster input stream.
552 // NOTE: on some systems (notably OS/2), the heap allocator uses an arena for
553 // small allocations (e.g., 64 byte allocations). this means that buffers may
554 // be allocated back-to-back. in the diagram above, for example, mReadLimit
555 // would actually be pointing at the beginning of the next segment. when
556 // making changes to this file, please keep this fact in mind.
559 //-----------------------------------------------------------------------------
561 //-----------------------------------------------------------------------------
563 nsPipe::nsPipe(uint32_t aSegmentSize
, uint32_t aSegmentCount
)
565 mReentrantMonitor("nsPipe.mReentrantMonitor"),
566 // protect against overflow
567 mMaxAdvanceBufferSegmentCount(
568 std::min(aSegmentCount
, UINT32_MAX
/ aSegmentSize
)),
570 mWriteCursor(nullptr),
571 mWriteLimit(nullptr),
573 // The internal buffer is always "infinite" so that we can allow
574 // the size to expand when cloned streams are read at different
575 // rates. We enforce a limit on how much data can be buffered
576 // ahead of the fastest reader in GetWriteSegment().
577 MOZ_ALWAYS_SUCCEEDS(mBuffer
.Init(aSegmentSize
));
580 nsPipe::~nsPipe() = default;
582 void nsPipe::PeekSegment(const nsPipeReadState
& aReadState
, uint32_t aIndex
,
583 char*& aCursor
, char*& aLimit
) {
585 MOZ_DIAGNOSTIC_ASSERT(!aReadState
.mReadCursor
|| mBuffer
.GetSegmentCount());
586 aCursor
= aReadState
.mReadCursor
;
587 aLimit
= aReadState
.mReadLimit
;
589 uint32_t absoluteIndex
= aReadState
.mSegment
+ aIndex
;
590 uint32_t numSegments
= mBuffer
.GetSegmentCount();
591 if (absoluteIndex
>= numSegments
) {
592 aCursor
= aLimit
= nullptr;
594 aCursor
= mBuffer
.GetSegment(absoluteIndex
);
595 if (mWriteSegment
== (int32_t)absoluteIndex
) {
596 aLimit
= mWriteCursor
;
598 aLimit
= aCursor
+ mBuffer
.GetSegmentSize();
604 nsresult
nsPipe::GetReadSegment(nsPipeReadState
& aReadState
,
605 const char*& aSegment
, uint32_t& aLength
) {
606 ReentrantMonitorAutoEnter
mon(mReentrantMonitor
);
608 if (aReadState
.mReadCursor
== aReadState
.mReadLimit
) {
609 return NS_FAILED(mStatus
) ? mStatus
: NS_BASE_STREAM_WOULD_BLOCK
;
612 // The input stream locks the pipe while getting the buffer to read from,
613 // but then unlocks while actual data copying is taking place. In
614 // order to avoid deleting the buffer out from under this lockless read
615 // set a flag to indicate a read is active. This flag is only modified
616 // while the lock is held.
617 MOZ_DIAGNOSTIC_ASSERT(!aReadState
.mActiveRead
);
618 aReadState
.mActiveRead
= true;
620 aSegment
= aReadState
.mReadCursor
;
621 aLength
= aReadState
.mReadLimit
- aReadState
.mReadCursor
;
622 MOZ_DIAGNOSTIC_ASSERT(aLength
<= aReadState
.mAvailable
);
627 void nsPipe::ReleaseReadSegment(nsPipeReadState
& aReadState
,
628 nsPipeEvents
& aEvents
) {
629 ReentrantMonitorAutoEnter
mon(mReentrantMonitor
);
631 MOZ_DIAGNOSTIC_ASSERT(aReadState
.mActiveRead
);
632 aReadState
.mActiveRead
= false;
634 // When a read completes and releases the mActiveRead flag, we may have
635 // blocked a drain from completing. This occurs when the input stream is
636 // closed during the read. In these cases, we need to complete the drain as
637 // soon as the active read completes.
638 if (aReadState
.mNeedDrain
) {
639 aReadState
.mNeedDrain
= false;
640 DrainInputStream(aReadState
, aEvents
);
644 void nsPipe::AdvanceReadCursor(nsPipeReadState
& aReadState
,
645 uint32_t aBytesRead
) {
646 MOZ_DIAGNOSTIC_ASSERT(aBytesRead
> 0);
650 ReentrantMonitorAutoEnter
mon(mReentrantMonitor
);
652 LOG(("III advancing read cursor by %u\n", aBytesRead
));
653 MOZ_DIAGNOSTIC_ASSERT(aBytesRead
<= mBuffer
.GetSegmentSize());
655 aReadState
.mReadCursor
+= aBytesRead
;
656 MOZ_DIAGNOSTIC_ASSERT(aReadState
.mReadCursor
<= aReadState
.mReadLimit
);
658 MOZ_DIAGNOSTIC_ASSERT(aReadState
.mAvailable
>= aBytesRead
);
659 aReadState
.mAvailable
-= aBytesRead
;
661 // Check to see if we're at the end of the available read data. If we
662 // are, and this segment is not still being written, then we can possibly
663 // free up the segment.
664 if (aReadState
.mReadCursor
== aReadState
.mReadLimit
&&
665 !ReadSegmentBeingWritten(aReadState
)) {
666 // Advance the segment position. If we have read any segments from the
667 // advance buffer then we can potentially notify blocked writers.
668 mOutput
.Monitor().AssertCurrentThreadIn();
669 if (AdvanceReadSegment(aReadState
, mon
) == SegmentAdvanceBufferRead
&&
670 mOutput
.OnOutputWritable(events
) == NotifyMonitor
) {
675 ReleaseReadSegment(aReadState
, events
);
679 SegmentChangeResult
nsPipe::AdvanceReadSegment(
680 nsPipeReadState
& aReadState
, const ReentrantMonitorAutoEnter
& ev
) {
681 // Calculate how many segments are buffered for this stream to start.
682 uint32_t startBufferSegments
= GetBufferSegmentCount(aReadState
, ev
);
684 int32_t currentSegment
= aReadState
.mSegment
;
686 // Move to the next segment to read
687 aReadState
.mSegment
+= 1;
689 // If this was the last reference to the first segment, then remove it.
690 if (currentSegment
== 0 && CountSegmentReferences(currentSegment
) == 0) {
691 // shift write and read segment index (-1 indicates an empty buffer).
694 // Directly modify the current read state. If the associated input
695 // stream is closed simultaneous with reading, then it may not be
696 // in the mInputList any more.
697 aReadState
.mSegment
-= 1;
699 for (uint32_t i
= 0; i
< mInputList
.Length(); ++i
) {
700 // Skip the current read state structure since we modify it manually
701 // before entering this loop.
702 if (&mInputList
[i
]->ReadState() == &aReadState
) {
705 mInputList
[i
]->ReadState().mSegment
-= 1;
708 // done with this segment
709 mBuffer
.DeleteFirstSegment();
710 LOG(("III deleting first segment\n"));
713 if (mWriteSegment
< aReadState
.mSegment
) {
714 // read cursor has hit the end of written data, so reset it
715 MOZ_DIAGNOSTIC_ASSERT(mWriteSegment
== (aReadState
.mSegment
- 1));
716 aReadState
.mReadCursor
= nullptr;
717 aReadState
.mReadLimit
= nullptr;
718 // also, the buffer is completely empty, so reset the write cursor
719 if (mWriteSegment
== -1) {
720 mWriteCursor
= nullptr;
721 mWriteLimit
= nullptr;
724 // advance read cursor and limit to next buffer segment
725 aReadState
.mReadCursor
= mBuffer
.GetSegment(aReadState
.mSegment
);
726 if (mWriteSegment
== aReadState
.mSegment
) {
727 aReadState
.mReadLimit
= mWriteCursor
;
729 aReadState
.mReadLimit
= aReadState
.mReadCursor
+ mBuffer
.GetSegmentSize();
733 // Calculate how many segments are buffered for the stream after
735 uint32_t endBufferSegments
= GetBufferSegmentCount(aReadState
, ev
);
737 // If the stream has read a segment out of the set of advanced buffer
738 // segments, then the writer may advance.
739 if (startBufferSegments
>= mMaxAdvanceBufferSegmentCount
&&
740 endBufferSegments
< mMaxAdvanceBufferSegmentCount
) {
741 return SegmentAdvanceBufferRead
;
744 // Otherwise there are no significant changes to the segment structure.
745 return SegmentNotChanged
;
748 void nsPipe::DrainInputStream(nsPipeReadState
& aReadState
,
749 nsPipeEvents
& aEvents
) {
750 ReentrantMonitorAutoEnter
mon(mReentrantMonitor
);
752 // If a segment is actively being read in ReadSegments() for this input
753 // stream, then we cannot drain the stream. This can happen because
754 // ReadSegments() does not hold the lock while copying from the buffer.
755 // If we detect this condition, simply note that we need a drain once
756 // the read completes and return immediately.
757 if (aReadState
.mActiveRead
) {
758 MOZ_DIAGNOSTIC_ASSERT(!aReadState
.mNeedDrain
);
759 aReadState
.mNeedDrain
= true;
763 while (mWriteSegment
>= aReadState
.mSegment
) {
764 // If the last segment to free is still being written to, we're done
765 // draining. We can't free any more.
766 if (ReadSegmentBeingWritten(aReadState
)) {
770 // Don't bother checking if this results in an advance buffer segment
771 // read. Since we are draining the entire stream we will read an
772 // advance buffer segment no matter what.
773 AdvanceReadSegment(aReadState
, mon
);
776 // Force the stream into an empty state. Make sure mAvailable, mCursor, and
777 // mReadLimit are consistent with one another.
778 aReadState
.mAvailable
= 0;
779 aReadState
.mReadCursor
= nullptr;
780 aReadState
.mReadLimit
= nullptr;
782 // Remove the input stream from the pipe's list of streams. This will
783 // prevent the pipe from holding the stream alive or trying to update
784 // its read state any further.
785 DebugOnly
<uint32_t> numRemoved
= 0;
786 mInputList
.RemoveElementsBy([&](nsPipeInputStream
* aEntry
) {
787 bool result
= &aReadState
== &aEntry
->ReadState();
788 numRemoved
+= result
? 1 : 0;
791 MOZ_ASSERT(numRemoved
== 1);
793 // If we have read any segments from the advance buffer then we can
794 // potentially notify blocked writers.
795 mOutput
.Monitor().AssertCurrentThreadIn();
796 if (!IsAdvanceBufferFull(mon
) &&
797 mOutput
.OnOutputWritable(aEvents
) == NotifyMonitor
) {
802 bool nsPipe::ReadSegmentBeingWritten(nsPipeReadState
& aReadState
) {
803 mReentrantMonitor
.AssertCurrentThreadIn();
805 mWriteSegment
== aReadState
.mSegment
&& mWriteLimit
> mWriteCursor
;
806 MOZ_DIAGNOSTIC_ASSERT(!beingWritten
|| aReadState
.mReadLimit
== mWriteCursor
);
810 nsresult
nsPipe::GetWriteSegment(char*& aSegment
, uint32_t& aSegmentLen
) {
811 ReentrantMonitorAutoEnter
mon(mReentrantMonitor
);
813 if (NS_FAILED(mStatus
)) {
817 // write cursor and limit may both be null indicating an empty buffer.
818 if (mWriteCursor
== mWriteLimit
) {
819 // The pipe is full if we have hit our limit on advance data buffering.
820 // This means the fastest reader is still reading slower than data is
821 // being written into the pipe.
822 if (IsAdvanceBufferFull(mon
)) {
823 return NS_BASE_STREAM_WOULD_BLOCK
;
826 // The nsSegmentedBuffer is configured to be "infinite", so this
827 // should never return nullptr here.
828 char* seg
= mBuffer
.AppendNewSegment();
830 return NS_ERROR_OUT_OF_MEMORY
;
833 LOG(("OOO appended new segment\n"));
835 mWriteLimit
= mWriteCursor
+ mBuffer
.GetSegmentSize();
839 // make sure read cursor is initialized
840 SetAllNullReadCursors();
842 // check to see if we can roll-back our read and write cursors to the
843 // beginning of the current/first segment. this is purely an optimization.
844 if (mWriteSegment
== 0 && AllReadCursorsMatchWriteCursor()) {
845 char* head
= mBuffer
.GetSegment(0);
846 LOG(("OOO rolling back write cursor %" PRId64
" bytes\n",
847 static_cast<int64_t>(mWriteCursor
- head
)));
848 RollBackAllReadCursors(head
);
852 aSegment
= mWriteCursor
;
853 aSegmentLen
= mWriteLimit
- mWriteCursor
;
857 void nsPipe::AdvanceWriteCursor(uint32_t aBytesWritten
) {
858 MOZ_DIAGNOSTIC_ASSERT(aBytesWritten
> 0);
862 ReentrantMonitorAutoEnter
mon(mReentrantMonitor
);
864 LOG(("OOO advancing write cursor by %u\n", aBytesWritten
));
866 char* newWriteCursor
= mWriteCursor
+ aBytesWritten
;
867 MOZ_DIAGNOSTIC_ASSERT(newWriteCursor
<= mWriteLimit
);
869 // update read limit if reading in the same segment
870 UpdateAllReadCursors(newWriteCursor
);
872 mWriteCursor
= newWriteCursor
;
874 ValidateAllReadCursors();
876 // update the writable flag on the output stream
877 if (mWriteCursor
== mWriteLimit
) {
878 mOutput
.Monitor().AssertCurrentThreadIn();
879 mOutput
.SetWritable(!IsAdvanceBufferFull(mon
));
882 // notify input stream that pipe now contains additional data
883 bool needNotify
= false;
884 for (uint32_t i
= 0; i
< mInputList
.Length(); ++i
) {
885 mInputList
[i
]->Monitor().AssertCurrentThreadIn();
886 if (mInputList
[i
]->OnInputReadable(aBytesWritten
, events
, mon
) ==
898 void nsPipe::OnInputStreamException(nsPipeInputStream
* aStream
,
900 MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason
));
904 ReentrantMonitorAutoEnter
mon(mReentrantMonitor
);
906 // Its possible to re-enter this method when we call OnPipeException() or
907 // OnInputExection() below. If there is a caller stuck in our synchronous
908 // Wait() method, then they will get woken up with a failure code which
909 // re-enters this method. Therefore, gracefully handle unknown streams
912 // If we only have one stream open and it is the given stream, then shut
913 // down the entire pipe.
914 if (mInputList
.Length() == 1) {
915 if (mInputList
[0] == aStream
) {
916 OnPipeException(aReason
);
921 // Otherwise just close the particular stream that hit an exception.
922 for (uint32_t i
= 0; i
< mInputList
.Length(); ++i
) {
923 if (mInputList
[i
] != aStream
) {
927 mInputList
[i
]->Monitor().AssertCurrentThreadIn();
928 MonitorAction action
=
929 mInputList
[i
]->OnInputException(aReason
, events
, mon
);
931 // Notify after element is removed in case we re-enter as a result.
932 if (action
== NotifyMonitor
) {
941 void nsPipe::OnPipeException(nsresult aReason
, bool aOutputOnly
) {
942 LOG(("PPP nsPipe::OnPipeException [reason=%" PRIx32
" output-only=%d]\n",
943 static_cast<uint32_t>(aReason
), aOutputOnly
));
947 ReentrantMonitorAutoEnter
mon(mReentrantMonitor
);
949 // if we've already hit an exception, then ignore this one.
950 if (NS_FAILED(mStatus
)) {
956 bool needNotify
= false;
958 // OnInputException() can drain the stream and remove it from
959 // mInputList. So iterate over a temp list instead.
960 nsTArray
<nsPipeInputStream
*> list
= mInputList
.Clone();
961 for (uint32_t i
= 0; i
< list
.Length(); ++i
) {
962 // an output-only exception applies to the input end if the pipe has
963 // zero bytes available.
964 list
[i
]->Monitor().AssertCurrentThreadIn();
965 if (aOutputOnly
&& list
[i
]->Available()) {
969 if (list
[i
]->OnInputException(aReason
, events
, mon
) == NotifyMonitor
) {
974 mOutput
.Monitor().AssertCurrentThreadIn();
975 if (mOutput
.OnOutputException(aReason
, events
) == NotifyMonitor
) {
979 // Notify after we have removed any input streams from mInputList
986 nsresult
nsPipe::CloneInputStream(nsPipeInputStream
* aOriginal
,
987 nsIInputStream
** aCloneOut
) {
988 ReentrantMonitorAutoEnter
mon(mReentrantMonitor
);
989 RefPtr
<nsPipeInputStream
> ref
= new nsPipeInputStream(*aOriginal
);
990 // don't add clones of closed pipes to mInputList.
991 ref
->Monitor().AssertCurrentThreadIn();
992 if (NS_SUCCEEDED(ref
->InputStatus(mon
))) {
993 mInputList
.AppendElement(ref
);
995 nsCOMPtr
<nsIAsyncInputStream
> upcast
= std::move(ref
);
996 upcast
.forget(aCloneOut
);
1000 uint32_t nsPipe::CountSegmentReferences(int32_t aSegment
) {
1001 mReentrantMonitor
.AssertCurrentThreadIn();
1003 for (uint32_t i
= 0; i
< mInputList
.Length(); ++i
) {
1004 if (aSegment
>= mInputList
[i
]->ReadState().mSegment
) {
1011 void nsPipe::SetAllNullReadCursors() {
1012 mReentrantMonitor
.AssertCurrentThreadIn();
1013 for (uint32_t i
= 0; i
< mInputList
.Length(); ++i
) {
1014 nsPipeReadState
& readState
= mInputList
[i
]->ReadState();
1015 if (!readState
.mReadCursor
) {
1016 MOZ_DIAGNOSTIC_ASSERT(mWriteSegment
== readState
.mSegment
);
1017 readState
.mReadCursor
= readState
.mReadLimit
= mWriteCursor
;
1022 bool nsPipe::AllReadCursorsMatchWriteCursor() {
1023 mReentrantMonitor
.AssertCurrentThreadIn();
1024 for (uint32_t i
= 0; i
< mInputList
.Length(); ++i
) {
1025 const nsPipeReadState
& readState
= mInputList
[i
]->ReadState();
1026 if (readState
.mSegment
!= mWriteSegment
||
1027 readState
.mReadCursor
!= mWriteCursor
) {
1034 void nsPipe::RollBackAllReadCursors(char* aWriteCursor
) {
1035 mReentrantMonitor
.AssertCurrentThreadIn();
1036 for (uint32_t i
= 0; i
< mInputList
.Length(); ++i
) {
1037 nsPipeReadState
& readState
= mInputList
[i
]->ReadState();
1038 MOZ_DIAGNOSTIC_ASSERT(mWriteSegment
== readState
.mSegment
);
1039 MOZ_DIAGNOSTIC_ASSERT(mWriteCursor
== readState
.mReadCursor
);
1040 MOZ_DIAGNOSTIC_ASSERT(mWriteCursor
== readState
.mReadLimit
);
1041 readState
.mReadCursor
= aWriteCursor
;
1042 readState
.mReadLimit
= aWriteCursor
;
1046 void nsPipe::UpdateAllReadCursors(char* aWriteCursor
) {
1047 mReentrantMonitor
.AssertCurrentThreadIn();
1048 for (uint32_t i
= 0; i
< mInputList
.Length(); ++i
) {
1049 nsPipeReadState
& readState
= mInputList
[i
]->ReadState();
1050 if (mWriteSegment
== readState
.mSegment
&&
1051 readState
.mReadLimit
== mWriteCursor
) {
1052 readState
.mReadLimit
= aWriteCursor
;
1057 void nsPipe::ValidateAllReadCursors() {
1058 mReentrantMonitor
.AssertCurrentThreadIn();
1059 // The only way mReadCursor == mWriteCursor is if:
1061 // - mReadCursor is at the start of a segment (which, based on how
1062 // nsSegmentedBuffer works, means that this segment is the "first"
1064 // - mWriteCursor points at the location past the end of the current
1065 // write segment (so the current write filled the current write
1066 // segment, so we've incremented mWriteCursor to point past the end
1068 // - the segment to which data has just been written is located
1069 // exactly one segment's worth of bytes before the first segment
1070 // where mReadCursor is located
1072 // Consequently, the byte immediately after the end of the current
1073 // write segment is the first byte of the first segment, so
1074 // mReadCursor == mWriteCursor. (Another way to think about this is
1075 // to consider the buffer architecture diagram above, but consider it
1076 // with an arena allocator which allocates from the *end* of the
1077 // arena to the *beginning* of the arena.)
1079 for (uint32_t i
= 0; i
< mInputList
.Length(); ++i
) {
1080 const nsPipeReadState
& state
= mInputList
[i
]->ReadState();
1081 MOZ_ASSERT(state
.mReadCursor
!= mWriteCursor
||
1082 (mBuffer
.GetSegment(state
.mSegment
) == state
.mReadCursor
&&
1083 mWriteCursor
== mWriteLimit
));
1088 uint32_t nsPipe::GetBufferSegmentCount(
1089 const nsPipeReadState
& aReadState
,
1090 const ReentrantMonitorAutoEnter
& ev
) const {
1091 // The write segment can be smaller than the current reader position
1092 // in some cases. For example, when the first write segment has not
1093 // been allocated yet mWriteSegment is negative. In these cases
1094 // the stream is effectively using zero segments.
1095 if (mWriteSegment
< aReadState
.mSegment
) {
1099 MOZ_DIAGNOSTIC_ASSERT(mWriteSegment
>= 0);
1100 MOZ_DIAGNOSTIC_ASSERT(aReadState
.mSegment
>= 0);
1102 // Otherwise at least one segment is being used. We add one here
1103 // since a single segment is being used when the write and read
1104 // segment indices are the same.
1105 return 1 + mWriteSegment
- aReadState
.mSegment
;
1108 bool nsPipe::IsAdvanceBufferFull(const ReentrantMonitorAutoEnter
& ev
) const {
1109 // If we have fewer total segments than the limit we can immediately
1110 // determine we are not full. Note, we must add one to mWriteSegment
1111 // to convert from a index to a count.
1112 MOZ_DIAGNOSTIC_ASSERT(mWriteSegment
>= -1);
1113 MOZ_DIAGNOSTIC_ASSERT(mWriteSegment
< INT32_MAX
);
1114 uint32_t totalWriteSegments
= mWriteSegment
+ 1;
1115 if (totalWriteSegments
< mMaxAdvanceBufferSegmentCount
) {
1119 // Otherwise we must inspect all of our reader streams. We need
1120 // to determine the buffer depth of the fastest reader.
1121 uint32_t minBufferSegments
= UINT32_MAX
;
1122 for (uint32_t i
= 0; i
< mInputList
.Length(); ++i
) {
1123 // Only count buffer segments from input streams that are open.
1124 mInputList
[i
]->Monitor().AssertCurrentThreadIn();
1125 if (NS_FAILED(mInputList
[i
]->Status(ev
))) {
1128 const nsPipeReadState
& state
= mInputList
[i
]->ReadState();
1129 uint32_t bufferSegments
= GetBufferSegmentCount(state
, ev
);
1130 minBufferSegments
= std::min(minBufferSegments
, bufferSegments
);
1131 // We only care if any reader has fewer segments buffered than
1132 // our threshold. We can stop once we hit that threshold.
1133 if (minBufferSegments
< mMaxAdvanceBufferSegmentCount
) {
1138 // Note, its possible for minBufferSegments to exceed our
1139 // mMaxAdvanceBufferSegmentCount here. This happens when a cloned
1140 // reader gets far behind, but then the fastest reader stream is
1141 // closed. This leaves us with a single stream that is buffered
1142 // beyond our max. Naturally we continue to indicate the pipe
1143 // is full at this point.
1148 //-----------------------------------------------------------------------------
1149 // nsPipeEvents methods:
1150 //-----------------------------------------------------------------------------
1152 nsPipeEvents::~nsPipeEvents() {
1153 // dispatch any pending events
1154 for (auto& callback
: mCallbacks
) {
1160 //-----------------------------------------------------------------------------
1161 // nsPipeInputStream methods:
1162 //-----------------------------------------------------------------------------
1164 NS_IMPL_ADDREF(nsPipeInputStream
);
1165 NS_IMPL_RELEASE(nsPipeInputStream
);
1167 NS_INTERFACE_TABLE_HEAD(nsPipeInputStream
)
1168 NS_INTERFACE_TABLE_BEGIN
1169 NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream
, nsIAsyncInputStream
)
1170 NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream
, nsITellableStream
)
1171 NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream
, nsISearchableInputStream
)
1172 NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream
, nsICloneableInputStream
)
1173 NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream
, nsIBufferedInputStream
)
1174 NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream
, nsIClassInfo
)
1175 NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream
, nsIInputStreamPriority
)
1176 NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsPipeInputStream
, nsIInputStream
,
1177 nsIAsyncInputStream
)
1178 NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsPipeInputStream
, nsISupports
,
1179 nsIAsyncInputStream
)
1180 NS_INTERFACE_TABLE_END
1181 NS_INTERFACE_TABLE_TAIL
1183 NS_IMPL_CI_INTERFACE_GETTER(nsPipeInputStream
, nsIInputStream
,
1184 nsIAsyncInputStream
, nsITellableStream
,
1185 nsISearchableInputStream
, nsICloneableInputStream
,
1186 nsIBufferedInputStream
)
1188 NS_IMPL_THREADSAFE_CI(nsPipeInputStream
)
1191 nsPipeInputStream::Init(nsIInputStream
*, uint32_t) {
1193 "nsPipeInputStream should never be initialized with "
1194 "nsIBufferedInputStream::Init!\n");
1198 nsPipeInputStream::GetData(nsIInputStream
** aResult
) {
1199 // as this was not created with init() we are not
1200 // wrapping anything
1201 return NS_ERROR_NOT_IMPLEMENTED
;
1204 uint32_t nsPipeInputStream::Available() {
1205 mPipe
->mReentrantMonitor
.AssertCurrentThreadIn();
1206 return mReadState
.mAvailable
;
1209 nsresult
nsPipeInputStream::Wait() {
1210 MOZ_DIAGNOSTIC_ASSERT(mBlocking
);
1212 ReentrantMonitorAutoEnter
mon(mPipe
->mReentrantMonitor
);
1214 while (NS_SUCCEEDED(Status(mon
)) && (mReadState
.mAvailable
== 0)) {
1215 LOG(("III pipe input: waiting for data\n"));
1221 LOG(("III pipe input: woke up [status=%" PRIx32
" available=%u]\n",
1222 static_cast<uint32_t>(Status(mon
)), mReadState
.mAvailable
));
1225 return Status(mon
) == NS_BASE_STREAM_CLOSED
? NS_OK
: Status(mon
);
1228 MonitorAction
nsPipeInputStream::OnInputReadable(
1229 uint32_t aBytesWritten
, nsPipeEvents
& aEvents
,
1230 const ReentrantMonitorAutoEnter
& ev
) {
1231 MonitorAction result
= DoNotNotifyMonitor
;
1233 mPipe
->mReentrantMonitor
.AssertCurrentThreadIn();
1234 mReadState
.mAvailable
+= aBytesWritten
;
1236 if (mCallback
&& !(mCallback
.Flags() & WAIT_CLOSURE_ONLY
)) {
1237 aEvents
.NotifyReady(std::move(mCallback
));
1238 } else if (mBlocked
) {
1239 result
= NotifyMonitor
;
1245 MonitorAction
nsPipeInputStream::OnInputException(
1246 nsresult aReason
, nsPipeEvents
& aEvents
,
1247 const ReentrantMonitorAutoEnter
& ev
) {
1248 LOG(("nsPipeInputStream::OnInputException [this=%p reason=%" PRIx32
"]\n",
1249 this, static_cast<uint32_t>(aReason
)));
1251 MonitorAction result
= DoNotNotifyMonitor
;
1253 MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason
));
1255 if (NS_SUCCEEDED(mInputStatus
)) {
1256 mInputStatus
= aReason
;
1259 // force count of available bytes to zero.
1260 mPipe
->DrainInputStream(mReadState
, aEvents
);
1263 aEvents
.NotifyReady(std::move(mCallback
));
1264 } else if (mBlocked
) {
1265 result
= NotifyMonitor
;
1272 nsPipeInputStream::CloseWithStatus(nsresult aReason
) {
1273 LOG(("III CloseWithStatus [this=%p reason=%" PRIx32
"]\n", this,
1274 static_cast<uint32_t>(aReason
)));
1276 ReentrantMonitorAutoEnter
mon(mPipe
->mReentrantMonitor
);
1278 if (NS_FAILED(mInputStatus
)) {
1282 if (NS_SUCCEEDED(aReason
)) {
1283 aReason
= NS_BASE_STREAM_CLOSED
;
1286 mPipe
->OnInputStreamException(this, aReason
);
1291 nsPipeInputStream::SetPriority(uint32_t priority
) {
1292 mPriority
= priority
;
1297 nsPipeInputStream::GetPriority(uint32_t* priority
) {
1298 *priority
= mPriority
;
1303 nsPipeInputStream::Close() { return CloseWithStatus(NS_BASE_STREAM_CLOSED
); }
1306 nsPipeInputStream::Available(uint64_t* aResult
) {
1307 // nsPipeInputStream supports under 4GB stream only
1308 ReentrantMonitorAutoEnter
mon(mPipe
->mReentrantMonitor
);
1310 // return error if closed
1311 if (!mReadState
.mAvailable
&& NS_FAILED(Status(mon
))) {
1315 *aResult
= (uint64_t)mReadState
.mAvailable
;
1320 nsPipeInputStream::StreamStatus() {
1321 ReentrantMonitorAutoEnter
mon(mPipe
->mReentrantMonitor
);
1322 return mReadState
.mAvailable
? NS_OK
: Status(mon
);
1326 nsPipeInputStream::ReadSegments(nsWriteSegmentFun aWriter
, void* aClosure
,
1327 uint32_t aCount
, uint32_t* aReadCount
) {
1328 LOG(("III ReadSegments [this=%p count=%u]\n", this, aCount
));
1330 nsresult rv
= NS_OK
;
1334 AutoReadSegment
segment(mPipe
, mReadState
, aCount
);
1335 rv
= segment
.Status();
1336 if (NS_FAILED(rv
)) {
1337 // ignore this error if we've already read something.
1338 if (*aReadCount
> 0) {
1342 if (rv
== NS_BASE_STREAM_WOULD_BLOCK
) {
1347 // wait for some data to be written to the pipe
1349 if (NS_SUCCEEDED(rv
)) {
1353 // ignore this error, just return.
1354 if (rv
== NS_BASE_STREAM_CLOSED
) {
1358 mPipe
->OnInputStreamException(this, rv
);
1362 uint32_t writeCount
;
1363 while (segment
.Length()) {
1366 rv
= aWriter(static_cast<nsIAsyncInputStream
*>(this), aClosure
,
1367 segment
.Data(), *aReadCount
, segment
.Length(), &writeCount
);
1369 if (NS_FAILED(rv
) || writeCount
== 0) {
1371 // any errors returned from the writer end here: do not
1372 // propagate to the caller of ReadSegments.
1377 MOZ_DIAGNOSTIC_ASSERT(writeCount
<= segment
.Length());
1378 segment
.Advance(writeCount
);
1379 aCount
-= writeCount
;
1380 *aReadCount
+= writeCount
;
1381 mLogicalOffset
+= writeCount
;
1389 nsPipeInputStream::Read(char* aToBuf
, uint32_t aBufLen
, uint32_t* aReadCount
) {
1390 return ReadSegments(NS_CopySegmentToBuffer
, aToBuf
, aBufLen
, aReadCount
);
1394 nsPipeInputStream::IsNonBlocking(bool* aNonBlocking
) {
1395 *aNonBlocking
= !mBlocking
;
1400 nsPipeInputStream::AsyncWait(nsIInputStreamCallback
* aCallback
, uint32_t aFlags
,
1401 uint32_t aRequestedCount
,
1402 nsIEventTarget
* aTarget
) {
1403 LOG(("III AsyncWait [this=%p]\n", this));
1405 nsPipeEvents pipeEvents
;
1407 ReentrantMonitorAutoEnter
mon(mPipe
->mReentrantMonitor
);
1409 // replace a pending callback
1410 mCallback
= nullptr;
1416 CallbackHolder
callback(this, aCallback
, aFlags
, aTarget
);
1418 if (NS_FAILED(Status(mon
)) ||
1419 (mReadState
.mAvailable
&& !(aFlags
& WAIT_CLOSURE_ONLY
))) {
1420 // stream is already closed or readable; post event.
1421 pipeEvents
.NotifyReady(std::move(callback
));
1423 // queue up callback object to be notified when data becomes available
1424 mCallback
= std::move(callback
);
1431 nsPipeInputStream::Tell(int64_t* aOffset
) {
1432 ReentrantMonitorAutoEnter
mon(mPipe
->mReentrantMonitor
);
1434 // return error if closed
1435 if (!mReadState
.mAvailable
&& NS_FAILED(Status(mon
))) {
1439 *aOffset
= mLogicalOffset
;
1443 static bool strings_equal(bool aIgnoreCase
, const char* aS1
, const char* aS2
,
1445 return aIgnoreCase
? !nsCRT::strncasecmp(aS1
, aS2
, aLen
)
1446 : !strncmp(aS1
, aS2
, aLen
);
1450 nsPipeInputStream::Search(const char* aForString
, bool aIgnoreCase
,
1451 bool* aFound
, uint32_t* aOffsetSearchedTo
) {
1452 LOG(("III Search [for=%s ic=%u]\n", aForString
, aIgnoreCase
));
1454 ReentrantMonitorAutoEnter
mon(mPipe
->mReentrantMonitor
);
1458 uint32_t index
= 0, offset
= 0;
1459 uint32_t strLen
= strlen(aForString
);
1461 mPipe
->PeekSegment(mReadState
, 0, cursor1
, limit1
);
1462 if (cursor1
== limit1
) {
1464 *aOffsetSearchedTo
= 0;
1465 LOG((" result [aFound=%u offset=%u]\n", *aFound
, *aOffsetSearchedTo
));
1470 uint32_t i
, len1
= limit1
- cursor1
;
1472 // check if the string is in the buffer segment
1473 for (i
= 0; i
< len1
- strLen
+ 1; i
++) {
1474 if (strings_equal(aIgnoreCase
, &cursor1
[i
], aForString
, strLen
)) {
1476 *aOffsetSearchedTo
= offset
+ i
;
1477 LOG((" result [aFound=%u offset=%u]\n", *aFound
, *aOffsetSearchedTo
));
1482 // get the next segment
1490 mPipe
->PeekSegment(mReadState
, index
, cursor2
, limit2
);
1491 if (cursor2
== limit2
) {
1493 *aOffsetSearchedTo
= offset
- strLen
+ 1;
1494 LOG((" result [aFound=%u offset=%u]\n", *aFound
, *aOffsetSearchedTo
));
1497 len2
= limit2
- cursor2
;
1499 // check if the string is straddling the next buffer segment
1500 uint32_t lim
= XPCOM_MIN(strLen
, len2
+ 1);
1501 for (i
= 0; i
< lim
; ++i
) {
1502 uint32_t strPart1Len
= strLen
- i
- 1;
1503 uint32_t strPart2Len
= strLen
- strPart1Len
;
1504 const char* strPart2
= &aForString
[strLen
- strPart2Len
];
1505 uint32_t bufSeg1Offset
= len1
- strPart1Len
;
1506 if (strings_equal(aIgnoreCase
, &cursor1
[bufSeg1Offset
], aForString
,
1508 strings_equal(aIgnoreCase
, cursor2
, strPart2
, strPart2Len
)) {
1510 *aOffsetSearchedTo
= offset
- strPart1Len
;
1511 LOG((" result [aFound=%u offset=%u]\n", *aFound
, *aOffsetSearchedTo
));
1516 // finally continue with the next buffer
1521 MOZ_ASSERT_UNREACHABLE("can't get here");
1522 return NS_ERROR_UNEXPECTED
; // keep compiler happy
1526 nsPipeInputStream::GetCloneable(bool* aCloneableOut
) {
1527 *aCloneableOut
= true;
1532 nsPipeInputStream::Clone(nsIInputStream
** aCloneOut
) {
1533 return mPipe
->CloneInputStream(this, aCloneOut
);
1536 nsresult
nsPipeInputStream::Status(const ReentrantMonitorAutoEnter
& ev
) const {
1537 if (NS_FAILED(mInputStatus
)) {
1538 return mInputStatus
;
1541 if (mReadState
.mAvailable
) {
1542 // Still something to read and this input stream state is OK.
1546 // Nothing to read, just fall through to the pipe's state that
1547 // may reflect state of its output stream side (already closed).
1548 return mPipe
->mStatus
;
1551 nsresult
nsPipeInputStream::Status() const {
1552 ReentrantMonitorAutoEnter
mon(mPipe
->mReentrantMonitor
);
1556 nsPipeInputStream::~nsPipeInputStream() { Close(); }
1558 //-----------------------------------------------------------------------------
1559 // nsPipeOutputStream methods:
1560 //-----------------------------------------------------------------------------
1562 NS_IMPL_QUERY_INTERFACE(nsPipeOutputStream
, nsIOutputStream
,
1563 nsIAsyncOutputStream
, nsIClassInfo
)
1565 NS_IMPL_CI_INTERFACE_GETTER(nsPipeOutputStream
, nsIOutputStream
,
1566 nsIAsyncOutputStream
)
1568 NS_IMPL_THREADSAFE_CI(nsPipeOutputStream
)
1570 nsresult
nsPipeOutputStream::Wait() {
1571 MOZ_DIAGNOSTIC_ASSERT(mBlocking
);
1573 ReentrantMonitorAutoEnter
mon(mPipe
->mReentrantMonitor
);
1575 if (NS_SUCCEEDED(mPipe
->mStatus
) && !mWritable
) {
1576 LOG(("OOO pipe output: waiting for space\n"));
1580 LOG(("OOO pipe output: woke up [pipe-status=%" PRIx32
" writable=%u]\n",
1581 static_cast<uint32_t>(mPipe
->mStatus
), mWritable
));
1584 return mPipe
->mStatus
== NS_BASE_STREAM_CLOSED
? NS_OK
: mPipe
->mStatus
;
1587 MonitorAction
nsPipeOutputStream::OnOutputWritable(nsPipeEvents
& aEvents
) {
1588 MonitorAction result
= DoNotNotifyMonitor
;
1592 if (mCallback
&& !(mCallback
.Flags() & WAIT_CLOSURE_ONLY
)) {
1593 aEvents
.NotifyReady(std::move(mCallback
));
1594 } else if (mBlocked
) {
1595 result
= NotifyMonitor
;
1601 MonitorAction
nsPipeOutputStream::OnOutputException(nsresult aReason
,
1602 nsPipeEvents
& aEvents
) {
1603 LOG(("nsPipeOutputStream::OnOutputException [this=%p reason=%" PRIx32
"]\n",
1604 this, static_cast<uint32_t>(aReason
)));
1606 MonitorAction result
= DoNotNotifyMonitor
;
1608 MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason
));
1612 aEvents
.NotifyReady(std::move(mCallback
));
1613 } else if (mBlocked
) {
1614 result
= NotifyMonitor
;
1620 NS_IMETHODIMP_(MozExternalRefCountType
)
1621 nsPipeOutputStream::AddRef() {
1623 return mPipe
->AddRef();
1626 NS_IMETHODIMP_(MozExternalRefCountType
)
1627 nsPipeOutputStream::Release() {
1628 if (--mWriterRefCnt
== 0) {
1631 return mPipe
->Release();
1635 nsPipeOutputStream::CloseWithStatus(nsresult aReason
) {
1636 LOG(("OOO CloseWithStatus [this=%p reason=%" PRIx32
"]\n", this,
1637 static_cast<uint32_t>(aReason
)));
1639 if (NS_SUCCEEDED(aReason
)) {
1640 aReason
= NS_BASE_STREAM_CLOSED
;
1643 // input stream may remain open
1644 mPipe
->OnPipeException(aReason
, true);
1649 nsPipeOutputStream::Close() { return CloseWithStatus(NS_BASE_STREAM_CLOSED
); }
1652 nsPipeOutputStream::WriteSegments(nsReadSegmentFun aReader
, void* aClosure
,
1653 uint32_t aCount
, uint32_t* aWriteCount
) {
1654 LOG(("OOO WriteSegments [this=%p count=%u]\n", this, aCount
));
1656 nsresult rv
= NS_OK
;
1659 uint32_t segmentLen
;
1663 rv
= mPipe
->GetWriteSegment(segment
, segmentLen
);
1664 if (NS_FAILED(rv
)) {
1665 if (rv
== NS_BASE_STREAM_WOULD_BLOCK
) {
1668 // ignore this error if we've already written something
1669 if (*aWriteCount
> 0) {
1674 // wait for the pipe to have an empty segment.
1676 if (NS_SUCCEEDED(rv
)) {
1680 mPipe
->OnPipeException(rv
);
1684 // write no more than aCount
1685 if (segmentLen
> aCount
) {
1686 segmentLen
= aCount
;
1689 uint32_t readCount
, originalLen
= segmentLen
;
1690 while (segmentLen
) {
1693 rv
= aReader(this, aClosure
, segment
, *aWriteCount
, segmentLen
,
1696 if (NS_FAILED(rv
) || readCount
== 0) {
1698 // any errors returned from the aReader end here: do not
1699 // propagate to the caller of WriteSegments.
1704 MOZ_DIAGNOSTIC_ASSERT(readCount
<= segmentLen
);
1705 segment
+= readCount
;
1706 segmentLen
-= readCount
;
1707 aCount
-= readCount
;
1708 *aWriteCount
+= readCount
;
1709 mLogicalOffset
+= readCount
;
1712 if (segmentLen
< originalLen
) {
1713 mPipe
->AdvanceWriteCursor(originalLen
- segmentLen
);
1721 nsPipeOutputStream::Write(const char* aFromBuf
, uint32_t aBufLen
,
1722 uint32_t* aWriteCount
) {
1723 return WriteSegments(NS_CopyBufferToSegment
, (void*)aFromBuf
, aBufLen
,
1728 nsPipeOutputStream::Flush() {
1734 nsPipeOutputStream::StreamStatus() {
1735 ReentrantMonitorAutoEnter
mon(mPipe
->mReentrantMonitor
);
1736 return mPipe
->mStatus
;
1740 nsPipeOutputStream::WriteFrom(nsIInputStream
* aFromStream
, uint32_t aCount
,
1741 uint32_t* aWriteCount
) {
1742 return WriteSegments(NS_CopyStreamToSegment
, aFromStream
, aCount
,
1747 nsPipeOutputStream::IsNonBlocking(bool* aNonBlocking
) {
1748 *aNonBlocking
= !mBlocking
;
1753 nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback
* aCallback
,
1754 uint32_t aFlags
, uint32_t aRequestedCount
,
1755 nsIEventTarget
* aTarget
) {
1756 LOG(("OOO AsyncWait [this=%p]\n", this));
1758 nsPipeEvents pipeEvents
;
1760 ReentrantMonitorAutoEnter
mon(mPipe
->mReentrantMonitor
);
1762 // replace a pending callback
1763 mCallback
= nullptr;
1769 CallbackHolder
callback(this, aCallback
, aFlags
, aTarget
);
1771 if (NS_FAILED(mPipe
->mStatus
) ||
1772 (mWritable
&& !(aFlags
& WAIT_CLOSURE_ONLY
))) {
1773 // stream is already closed or writable; post event.
1774 pipeEvents
.NotifyReady(std::move(callback
));
1776 // queue up callback object to be notified when data becomes available
1777 mCallback
= std::move(callback
);
1783 ////////////////////////////////////////////////////////////////////////////////
1785 void NS_NewPipe(nsIInputStream
** aPipeIn
, nsIOutputStream
** aPipeOut
,
1786 uint32_t aSegmentSize
, uint32_t aMaxSize
,
1787 bool aNonBlockingInput
, bool aNonBlockingOutput
) {
1788 if (aSegmentSize
== 0) {
1789 aSegmentSize
= DEFAULT_SEGMENT_SIZE
;
1792 // Handle aMaxSize of UINT32_MAX as a special case
1793 uint32_t segmentCount
;
1794 if (aMaxSize
== UINT32_MAX
) {
1795 segmentCount
= UINT32_MAX
;
1797 segmentCount
= aMaxSize
/ aSegmentSize
;
1800 nsIAsyncInputStream
* in
;
1801 nsIAsyncOutputStream
* out
;
1802 NS_NewPipe2(&in
, &out
, aNonBlockingInput
, aNonBlockingOutput
, aSegmentSize
,
1809 // Disable thread safety analysis as this is logically a constructor, and no
1810 // additional threads can observe these objects yet.
1811 void NS_NewPipe2(nsIAsyncInputStream
** aPipeIn
, nsIAsyncOutputStream
** aPipeOut
,
1812 bool aNonBlockingInput
, bool aNonBlockingOutput
,
1813 uint32_t aSegmentSize
,
1814 uint32_t aSegmentCount
) MOZ_NO_THREAD_SAFETY_ANALYSIS
{
1815 RefPtr
<nsPipe
> pipe
=
1816 new nsPipe(aSegmentSize
? aSegmentSize
: DEFAULT_SEGMENT_SIZE
,
1817 aSegmentCount
? aSegmentCount
: DEFAULT_SEGMENT_COUNT
);
1819 RefPtr
<nsPipeInputStream
> pipeIn
= new nsPipeInputStream(pipe
);
1820 pipe
->mInputList
.AppendElement(pipeIn
);
1821 RefPtr
<nsPipeOutputStream
> pipeOut
= &pipe
->mOutput
;
1823 pipeIn
->SetNonBlocking(aNonBlockingInput
);
1824 pipeOut
->SetNonBlocking(aNonBlockingOutput
);
1826 pipeIn
.forget(aPipeIn
);
1827 pipeOut
.forget(aPipeOut
);
1830 ////////////////////////////////////////////////////////////////////////////////
1832 // Thin nsIPipe implementation for consumers of the component manager interface
1833 // for creating pipes. Acts as a thin wrapper around NS_NewPipe2 for JS callers.
1834 class nsPipeHolder final
: public nsIPipe
{
1836 NS_DECL_THREADSAFE_ISUPPORTS
1840 ~nsPipeHolder() = default;
1842 nsCOMPtr
<nsIAsyncInputStream
> mInput
;
1843 nsCOMPtr
<nsIAsyncOutputStream
> mOutput
;
1846 NS_IMPL_ISUPPORTS(nsPipeHolder
, nsIPipe
)
1849 nsPipeHolder::Init(bool aNonBlockingInput
, bool aNonBlockingOutput
,
1850 uint32_t aSegmentSize
, uint32_t aSegmentCount
) {
1851 if (mInput
|| mOutput
) {
1852 return NS_ERROR_ALREADY_INITIALIZED
;
1854 NS_NewPipe2(getter_AddRefs(mInput
), getter_AddRefs(mOutput
),
1855 aNonBlockingInput
, aNonBlockingOutput
, aSegmentSize
,
1861 nsPipeHolder::GetInputStream(nsIAsyncInputStream
** aInputStream
) {
1863 *aInputStream
= do_AddRef(mInput
).take();
1866 return NS_ERROR_NOT_INITIALIZED
;
1870 nsPipeHolder::GetOutputStream(nsIAsyncOutputStream
** aOutputStream
) {
1872 *aOutputStream
= do_AddRef(mOutput
).take();
1875 return NS_ERROR_NOT_INITIALIZED
;
1878 nsresult
nsPipeConstructor(REFNSIID aIID
, void** aResult
) {
1879 RefPtr
<nsPipeHolder
> pipe
= new nsPipeHolder();
1880 nsresult rv
= pipe
->QueryInterface(aIID
, aResult
);
1884 ////////////////////////////////////////////////////////////////////////////////