Bug 1867190 - Add prefs for PHC probablities r=glandium
[gecko.git] / xpcom / io / nsPipe3.cpp
blob3d7486e673871ba21966c8b6ea8f8fc7e7cce741
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include <algorithm>
8 #include "mozilla/Attributes.h"
9 #include "mozilla/IntegerPrintfMacros.h"
10 #include "mozilla/ReentrantMonitor.h"
11 #include "nsIBufferedStreams.h"
12 #include "nsICloneableInputStream.h"
13 #include "nsIPipe.h"
14 #include "nsIEventTarget.h"
15 #include "nsITellableStream.h"
16 #include "mozilla/RefPtr.h"
17 #include "nsSegmentedBuffer.h"
18 #include "nsStreamUtils.h"
19 #include "nsString.h"
20 #include "nsCOMPtr.h"
21 #include "nsCRT.h"
22 #include "mozilla/Logging.h"
23 #include "nsIClassInfoImpl.h"
24 #include "nsAlgorithm.h"
25 #include "nsPipe.h"
26 #include "nsIAsyncInputStream.h"
27 #include "nsIAsyncOutputStream.h"
28 #include "nsIInputStreamPriority.h"
29 #include "nsThreadUtils.h"
31 using namespace mozilla;
33 #ifdef LOG
34 # undef LOG
35 #endif
37 // set MOZ_LOG=nsPipe:5
39 static LazyLogModule sPipeLog("nsPipe");
40 #define LOG(args) MOZ_LOG(sPipeLog, mozilla::LogLevel::Debug, args)
42 #define DEFAULT_SEGMENT_SIZE 4096
43 #define DEFAULT_SEGMENT_COUNT 16
45 class nsPipe;
46 class nsPipeEvents;
47 class nsPipeInputStream;
48 class nsPipeOutputStream;
49 class AutoReadSegment;
51 namespace {
53 enum MonitorAction { DoNotNotifyMonitor, NotifyMonitor };
55 enum SegmentChangeResult { SegmentNotChanged, SegmentAdvanceBufferRead };
57 } // namespace
59 //-----------------------------------------------------------------------------
61 class CallbackHolder {
62 public:
63 CallbackHolder() = default;
64 MOZ_IMPLICIT CallbackHolder(std::nullptr_t) {}
66 CallbackHolder(nsIAsyncInputStream* aStream,
67 nsIInputStreamCallback* aCallback, uint32_t aFlags,
68 nsIEventTarget* aEventTarget)
69 : mRunnable(aCallback ? NS_NewCancelableRunnableFunction(
70 "nsPipeInputStream AsyncWait Callback",
71 [stream = nsCOMPtr{aStream},
72 callback = nsCOMPtr{aCallback}]() {
73 callback->OnInputStreamReady(stream);
75 : nullptr),
76 mEventTarget(aEventTarget),
77 mFlags(aFlags) {}
79 CallbackHolder(nsIAsyncOutputStream* aStream,
80 nsIOutputStreamCallback* aCallback, uint32_t aFlags,
81 nsIEventTarget* aEventTarget)
82 : mRunnable(aCallback ? NS_NewCancelableRunnableFunction(
83 "nsPipeOutputStream AsyncWait Callback",
84 [stream = nsCOMPtr{aStream},
85 callback = nsCOMPtr{aCallback}]() {
86 callback->OnOutputStreamReady(stream);
88 : nullptr),
89 mEventTarget(aEventTarget),
90 mFlags(aFlags) {}
92 CallbackHolder(const CallbackHolder&) = delete;
93 CallbackHolder(CallbackHolder&&) = default;
94 CallbackHolder& operator=(const CallbackHolder&) = delete;
95 CallbackHolder& operator=(CallbackHolder&&) = default;
97 CallbackHolder& operator=(std::nullptr_t) {
98 mRunnable = nullptr;
99 mEventTarget = nullptr;
100 mFlags = 0;
101 return *this;
104 MOZ_IMPLICIT operator bool() const { return mRunnable; }
106 uint32_t Flags() const {
107 MOZ_ASSERT(mRunnable, "Should only be called when a callback is present");
108 return mFlags;
111 void Notify() {
112 nsCOMPtr<nsIRunnable> runnable = mRunnable.forget();
113 nsCOMPtr<nsIEventTarget> eventTarget = mEventTarget.forget();
114 if (runnable) {
115 if (eventTarget) {
116 eventTarget->Dispatch(runnable.forget());
117 } else {
118 runnable->Run();
123 private:
124 nsCOMPtr<nsIRunnable> mRunnable;
125 nsCOMPtr<nsIEventTarget> mEventTarget;
126 uint32_t mFlags = 0;
129 //-----------------------------------------------------------------------------
131 // this class is used to delay notifications until the end of a particular
132 // scope. it helps avoid the complexity of issuing callbacks while inside
133 // a critical section.
134 class nsPipeEvents {
135 public:
136 nsPipeEvents() = default;
137 ~nsPipeEvents();
139 inline void NotifyReady(CallbackHolder aCallback) {
140 mCallbacks.AppendElement(std::move(aCallback));
143 private:
144 nsTArray<CallbackHolder> mCallbacks;
147 //-----------------------------------------------------------------------------
149 // This class is used to maintain input stream state. Its broken out from the
150 // nsPipeInputStream class because generally the nsPipe should be modifying
151 // this state and not the input stream itself.
152 struct nsPipeReadState {
153 nsPipeReadState()
154 : mReadCursor(nullptr),
155 mReadLimit(nullptr),
156 mSegment(0),
157 mAvailable(0),
158 mActiveRead(false),
159 mNeedDrain(false) {}
161 // All members of this type are guarded by the pipe monitor, however it cannot
162 // be named from this type, so the less-reliable MOZ_GUARDED_VAR is used
163 // instead. In the future it would be nice to avoid this, especially as
164 // MOZ_GUARDED_VAR is deprecated.
165 char* mReadCursor MOZ_GUARDED_VAR;
166 char* mReadLimit MOZ_GUARDED_VAR;
167 int32_t mSegment MOZ_GUARDED_VAR;
168 uint32_t mAvailable MOZ_GUARDED_VAR;
170 // This flag is managed using the AutoReadSegment RAII stack class.
171 bool mActiveRead MOZ_GUARDED_VAR;
173 // Set to indicate that the input stream has closed and should be drained,
174 // but that drain has been delayed due to an active read. When the read
175 // completes, this flag indicate the drain should then be performed.
176 bool mNeedDrain MOZ_GUARDED_VAR;
179 //-----------------------------------------------------------------------------
181 // an input end of a pipe (maintained as a list of refs within the pipe)
182 class nsPipeInputStream final : public nsIAsyncInputStream,
183 public nsITellableStream,
184 public nsISearchableInputStream,
185 public nsICloneableInputStream,
186 public nsIClassInfo,
187 public nsIBufferedInputStream,
188 public nsIInputStreamPriority {
189 public:
190 NS_DECL_THREADSAFE_ISUPPORTS
191 NS_DECL_NSIINPUTSTREAM
192 NS_DECL_NSIASYNCINPUTSTREAM
193 NS_DECL_NSITELLABLESTREAM
194 NS_DECL_NSISEARCHABLEINPUTSTREAM
195 NS_DECL_NSICLONEABLEINPUTSTREAM
196 NS_DECL_NSICLASSINFO
197 NS_DECL_NSIBUFFEREDINPUTSTREAM
198 NS_DECL_NSIINPUTSTREAMPRIORITY
200 explicit nsPipeInputStream(nsPipe* aPipe)
201 : mPipe(aPipe),
202 mLogicalOffset(0),
203 mInputStatus(NS_OK),
204 mBlocking(true),
205 mBlocked(false),
206 mPriority(nsIRunnablePriority::PRIORITY_NORMAL) {}
208 nsPipeInputStream(const nsPipeInputStream& aOther)
209 : mPipe(aOther.mPipe),
210 mLogicalOffset(aOther.mLogicalOffset),
211 mInputStatus(aOther.mInputStatus),
212 mBlocking(aOther.mBlocking),
213 mBlocked(false),
214 mReadState(aOther.mReadState),
215 mPriority(nsIRunnablePriority::PRIORITY_NORMAL) {}
217 void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; }
219 uint32_t Available() MOZ_REQUIRES(Monitor());
221 // synchronously wait for the pipe to become readable.
222 nsresult Wait();
224 // These two don't acquire the monitor themselves. Instead they
225 // expect their caller to have done so and to pass the monitor as
226 // evidence.
227 MonitorAction OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&,
228 const ReentrantMonitorAutoEnter& ev)
229 MOZ_REQUIRES(Monitor());
230 MonitorAction OnInputException(nsresult, nsPipeEvents&,
231 const ReentrantMonitorAutoEnter& ev)
232 MOZ_REQUIRES(Monitor());
234 nsPipeReadState& ReadState() { return mReadState; }
236 const nsPipeReadState& ReadState() const { return mReadState; }
238 nsresult Status() const;
240 // A version of Status() that doesn't acquire the monitor.
241 nsresult Status(const ReentrantMonitorAutoEnter& ev) const
242 MOZ_REQUIRES(Monitor());
244 // The status of this input stream, ignoring the status of the underlying
245 // monitor. If this status is errored, the input stream has either already
246 // been removed from the pipe, or will be removed from the pipe shortly.
247 nsresult InputStatus(const ReentrantMonitorAutoEnter&) const
248 MOZ_REQUIRES(Monitor()) {
249 return mInputStatus;
252 ReentrantMonitor& Monitor() const;
254 private:
255 virtual ~nsPipeInputStream();
257 RefPtr<nsPipe> mPipe;
259 int64_t mLogicalOffset;
260 // Individual input streams can be closed without effecting the rest of the
261 // pipe. So track individual input stream status separately. |mInputStatus|
262 // is protected by |mPipe->mReentrantMonitor|.
263 nsresult mInputStatus MOZ_GUARDED_BY(Monitor());
264 bool mBlocking;
266 // these variables can only be accessed while inside the pipe's monitor
267 bool mBlocked MOZ_GUARDED_BY(Monitor());
268 CallbackHolder mCallback MOZ_GUARDED_BY(Monitor());
270 // requires pipe's monitor to access members; usually treat as an opaque token
271 // to pass to nsPipe
272 nsPipeReadState mReadState;
273 Atomic<uint32_t, Relaxed> mPriority;
276 //-----------------------------------------------------------------------------
278 // the output end of a pipe (allocated as a member of the pipe).
279 class nsPipeOutputStream : public nsIAsyncOutputStream, public nsIClassInfo {
280 public:
281 // since this class will be allocated as a member of the pipe, we do not
282 // need our own ref count. instead, we share the lifetime (the ref count)
283 // of the entire pipe. this macro is just convenience since it does not
284 // declare a mRefCount variable; however, don't let the name fool you...
285 // we are not inheriting from nsPipe ;-)
286 NS_DECL_ISUPPORTS_INHERITED
288 NS_DECL_NSIOUTPUTSTREAM
289 NS_DECL_NSIASYNCOUTPUTSTREAM
290 NS_DECL_NSICLASSINFO
292 explicit nsPipeOutputStream(nsPipe* aPipe)
293 : mPipe(aPipe),
294 mWriterRefCnt(0),
295 mLogicalOffset(0),
296 mBlocking(true),
297 mBlocked(false),
298 mWritable(true) {}
300 void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; }
301 void SetWritable(bool aWritable) MOZ_REQUIRES(Monitor()) {
302 mWritable = aWritable;
305 // synchronously wait for the pipe to become writable.
306 nsresult Wait();
308 MonitorAction OnOutputWritable(nsPipeEvents&) MOZ_REQUIRES(Monitor());
309 MonitorAction OnOutputException(nsresult, nsPipeEvents&)
310 MOZ_REQUIRES(Monitor());
312 ReentrantMonitor& Monitor() const;
314 private:
315 nsPipe* mPipe;
317 // separate refcnt so that we know when to close the producer
318 ThreadSafeAutoRefCnt mWriterRefCnt;
319 int64_t mLogicalOffset;
320 bool mBlocking;
322 // these variables can only be accessed while inside the pipe's monitor
323 bool mBlocked MOZ_GUARDED_BY(Monitor());
324 bool mWritable MOZ_GUARDED_BY(Monitor());
325 CallbackHolder mCallback MOZ_GUARDED_BY(Monitor());
328 //-----------------------------------------------------------------------------
330 class nsPipe final {
331 public:
332 friend class nsPipeInputStream;
333 friend class nsPipeOutputStream;
334 friend class AutoReadSegment;
336 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(nsPipe)
338 // public constructor
339 friend void NS_NewPipe2(nsIAsyncInputStream**, nsIAsyncOutputStream**, bool,
340 bool, uint32_t, uint32_t);
342 private:
343 nsPipe(uint32_t aSegmentSize, uint32_t aSegmentCount);
344 ~nsPipe();
347 // Methods below may only be called while inside the pipe's monitor. Some
348 // of these methods require passing a ReentrantMonitorAutoEnter to prove the
349 // monitor is held.
352 void PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex,
353 char*& aCursor, char*& aLimit)
354 MOZ_REQUIRES(mReentrantMonitor);
355 SegmentChangeResult AdvanceReadSegment(nsPipeReadState& aReadState,
356 const ReentrantMonitorAutoEnter& ev)
357 MOZ_REQUIRES(mReentrantMonitor);
358 bool ReadSegmentBeingWritten(nsPipeReadState& aReadState)
359 MOZ_REQUIRES(mReentrantMonitor);
360 uint32_t CountSegmentReferences(int32_t aSegment)
361 MOZ_REQUIRES(mReentrantMonitor);
362 void SetAllNullReadCursors() MOZ_REQUIRES(mReentrantMonitor);
363 bool AllReadCursorsMatchWriteCursor() MOZ_REQUIRES(mReentrantMonitor);
364 void RollBackAllReadCursors(char* aWriteCursor)
365 MOZ_REQUIRES(mReentrantMonitor);
366 void UpdateAllReadCursors(char* aWriteCursor) MOZ_REQUIRES(mReentrantMonitor);
367 void ValidateAllReadCursors() MOZ_REQUIRES(mReentrantMonitor);
368 uint32_t GetBufferSegmentCount(const nsPipeReadState& aReadState,
369 const ReentrantMonitorAutoEnter& ev) const
370 MOZ_REQUIRES(mReentrantMonitor);
371 bool IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const
372 MOZ_REQUIRES(mReentrantMonitor);
375 // methods below may be called while outside the pipe's monitor
378 void DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents);
379 nsresult GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen);
380 void AdvanceWriteCursor(uint32_t aCount);
382 void OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason);
383 void OnPipeException(nsresult aReason, bool aOutputOnly = false);
385 nsresult CloneInputStream(nsPipeInputStream* aOriginal,
386 nsIInputStream** aCloneOut);
388 // methods below should only be called by AutoReadSegment
389 nsresult GetReadSegment(nsPipeReadState& aReadState, const char*& aSegment,
390 uint32_t& aLength);
391 void ReleaseReadSegment(nsPipeReadState& aReadState, nsPipeEvents& aEvents);
392 void AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aCount);
394 // We can't inherit from both nsIInputStream and nsIOutputStream
395 // because they collide on their Close method. Consequently we nest their
396 // implementations to avoid the extra object allocation.
397 nsPipeOutputStream mOutput;
399 // Since the input stream can be cloned, we may have more than one. Use
400 // a weak reference as the streams will clear their entry here in their
401 // destructor. Using a strong reference would create a reference cycle.
402 // Only usable while mReentrantMonitor is locked.
403 nsTArray<nsPipeInputStream*> mInputList MOZ_GUARDED_BY(mReentrantMonitor);
405 ReentrantMonitor mReentrantMonitor;
406 nsSegmentedBuffer mBuffer MOZ_GUARDED_BY(mReentrantMonitor);
408 // The maximum number of segments to allow to be buffered in advance
409 // of the fastest reader. This is collection of segments is called
410 // the "advance buffer".
411 uint32_t mMaxAdvanceBufferSegmentCount MOZ_GUARDED_BY(mReentrantMonitor);
413 int32_t mWriteSegment MOZ_GUARDED_BY(mReentrantMonitor);
414 char* mWriteCursor MOZ_GUARDED_BY(mReentrantMonitor);
415 char* mWriteLimit MOZ_GUARDED_BY(mReentrantMonitor);
417 // |mStatus| is protected by |mReentrantMonitor|.
418 nsresult mStatus MOZ_GUARDED_BY(mReentrantMonitor);
421 //-----------------------------------------------------------------------------
423 // Declarations of Monitor() methods on the streams.
425 // These must be placed early to provide MOZ_RETURN_CAPABILITY annotations for
426 // the thread-safety analysis. This couldn't be done at the declaration due to
427 // nsPipe not yet being defined.
429 ReentrantMonitor& nsPipeOutputStream::Monitor() const
430 MOZ_RETURN_CAPABILITY(mPipe->mReentrantMonitor) {
431 return mPipe->mReentrantMonitor;
434 ReentrantMonitor& nsPipeInputStream::Monitor() const
435 MOZ_RETURN_CAPABILITY(mPipe->mReentrantMonitor) {
436 return mPipe->mReentrantMonitor;
439 //-----------------------------------------------------------------------------
441 // RAII class representing an active read segment. When it goes out of scope
442 // it automatically updates the read cursor and releases the read segment.
443 class MOZ_STACK_CLASS AutoReadSegment final {
444 public:
445 AutoReadSegment(nsPipe* aPipe, nsPipeReadState& aReadState,
446 uint32_t aMaxLength)
447 : mPipe(aPipe),
448 mReadState(aReadState),
449 mStatus(NS_ERROR_FAILURE),
450 mSegment(nullptr),
451 mLength(0),
452 mOffset(0) {
453 MOZ_DIAGNOSTIC_ASSERT(mPipe);
454 MOZ_DIAGNOSTIC_ASSERT(!mReadState.mActiveRead);
455 mStatus = mPipe->GetReadSegment(mReadState, mSegment, mLength);
456 if (NS_SUCCEEDED(mStatus)) {
457 MOZ_DIAGNOSTIC_ASSERT(mReadState.mActiveRead);
458 MOZ_DIAGNOSTIC_ASSERT(mSegment);
459 mLength = std::min(mLength, aMaxLength);
460 MOZ_DIAGNOSTIC_ASSERT(mLength);
464 ~AutoReadSegment() {
465 if (NS_SUCCEEDED(mStatus)) {
466 if (mOffset) {
467 mPipe->AdvanceReadCursor(mReadState, mOffset);
468 } else {
469 nsPipeEvents events;
470 mPipe->ReleaseReadSegment(mReadState, events);
473 MOZ_DIAGNOSTIC_ASSERT(!mReadState.mActiveRead);
476 nsresult Status() const { return mStatus; }
478 const char* Data() const {
479 MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus));
480 MOZ_DIAGNOSTIC_ASSERT(mSegment);
481 return mSegment + mOffset;
484 uint32_t Length() const {
485 MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus));
486 MOZ_DIAGNOSTIC_ASSERT(mLength >= mOffset);
487 return mLength - mOffset;
490 void Advance(uint32_t aCount) {
491 MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus));
492 MOZ_DIAGNOSTIC_ASSERT(aCount <= (mLength - mOffset));
493 mOffset += aCount;
496 nsPipeReadState& ReadState() const { return mReadState; }
498 private:
499 // guaranteed to remain alive due to limited stack lifetime of AutoReadSegment
500 nsPipe* mPipe;
501 nsPipeReadState& mReadState;
502 nsresult mStatus;
503 const char* mSegment;
504 uint32_t mLength;
505 uint32_t mOffset;
509 // NOTES on buffer architecture:
511 // +-----------------+ - - mBuffer.GetSegment(0)
512 // | |
513 // + - - - - - - - - + - - nsPipeReadState.mReadCursor
514 // |/////////////////|
515 // |/////////////////|
516 // |/////////////////|
517 // |/////////////////|
518 // +-----------------+ - - nsPipeReadState.mReadLimit
519 // |
520 // +-----------------+
521 // |/////////////////|
522 // |/////////////////|
523 // |/////////////////|
524 // |/////////////////|
525 // |/////////////////|
526 // |/////////////////|
527 // +-----------------+
528 // |
529 // +-----------------+ - - mBuffer.GetSegment(mWriteSegment)
530 // |/////////////////|
531 // |/////////////////|
532 // |/////////////////|
533 // + - - - - - - - - + - - mWriteCursor
534 // | |
535 // | |
536 // +-----------------+ - - mWriteLimit
538 // (shaded region contains data)
540 // NOTE: Each input stream produced by the nsPipe contains its own, separate
541 // nsPipeReadState. This means there are multiple mReadCursor and
542 // mReadLimit values in play. The pipe cannot discard old data until
543 // all mReadCursors have moved beyond that point in the stream.
545 // Likewise, each input stream reader will have it's own amount of
546 // buffered data. The pipe size threshold, however, is only applied
547 // to the input stream that is being read fastest. We call this
548 // the "advance buffer" in that its in advance of all readers. We
549 // allow slower input streams to buffer more data so that we don't
550 // stall processing of the faster input stream.
552 // NOTE: on some systems (notably OS/2), the heap allocator uses an arena for
553 // small allocations (e.g., 64 byte allocations). this means that buffers may
554 // be allocated back-to-back. in the diagram above, for example, mReadLimit
555 // would actually be pointing at the beginning of the next segment. when
556 // making changes to this file, please keep this fact in mind.
559 //-----------------------------------------------------------------------------
560 // nsPipe methods:
561 //-----------------------------------------------------------------------------
563 nsPipe::nsPipe(uint32_t aSegmentSize, uint32_t aSegmentCount)
564 : mOutput(this),
565 mReentrantMonitor("nsPipe.mReentrantMonitor"),
566 // protect against overflow
567 mMaxAdvanceBufferSegmentCount(
568 std::min(aSegmentCount, UINT32_MAX / aSegmentSize)),
569 mWriteSegment(-1),
570 mWriteCursor(nullptr),
571 mWriteLimit(nullptr),
572 mStatus(NS_OK) {
573 // The internal buffer is always "infinite" so that we can allow
574 // the size to expand when cloned streams are read at different
575 // rates. We enforce a limit on how much data can be buffered
576 // ahead of the fastest reader in GetWriteSegment().
577 MOZ_ALWAYS_SUCCEEDS(mBuffer.Init(aSegmentSize));
580 nsPipe::~nsPipe() = default;
582 void nsPipe::PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex,
583 char*& aCursor, char*& aLimit) {
584 if (aIndex == 0) {
585 MOZ_DIAGNOSTIC_ASSERT(!aReadState.mReadCursor || mBuffer.GetSegmentCount());
586 aCursor = aReadState.mReadCursor;
587 aLimit = aReadState.mReadLimit;
588 } else {
589 uint32_t absoluteIndex = aReadState.mSegment + aIndex;
590 uint32_t numSegments = mBuffer.GetSegmentCount();
591 if (absoluteIndex >= numSegments) {
592 aCursor = aLimit = nullptr;
593 } else {
594 aCursor = mBuffer.GetSegment(absoluteIndex);
595 if (mWriteSegment == (int32_t)absoluteIndex) {
596 aLimit = mWriteCursor;
597 } else {
598 aLimit = aCursor + mBuffer.GetSegmentSize();
604 nsresult nsPipe::GetReadSegment(nsPipeReadState& aReadState,
605 const char*& aSegment, uint32_t& aLength) {
606 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
608 if (aReadState.mReadCursor == aReadState.mReadLimit) {
609 return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK;
612 // The input stream locks the pipe while getting the buffer to read from,
613 // but then unlocks while actual data copying is taking place. In
614 // order to avoid deleting the buffer out from under this lockless read
615 // set a flag to indicate a read is active. This flag is only modified
616 // while the lock is held.
617 MOZ_DIAGNOSTIC_ASSERT(!aReadState.mActiveRead);
618 aReadState.mActiveRead = true;
620 aSegment = aReadState.mReadCursor;
621 aLength = aReadState.mReadLimit - aReadState.mReadCursor;
622 MOZ_DIAGNOSTIC_ASSERT(aLength <= aReadState.mAvailable);
624 return NS_OK;
627 void nsPipe::ReleaseReadSegment(nsPipeReadState& aReadState,
628 nsPipeEvents& aEvents) {
629 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
631 MOZ_DIAGNOSTIC_ASSERT(aReadState.mActiveRead);
632 aReadState.mActiveRead = false;
634 // When a read completes and releases the mActiveRead flag, we may have
635 // blocked a drain from completing. This occurs when the input stream is
636 // closed during the read. In these cases, we need to complete the drain as
637 // soon as the active read completes.
638 if (aReadState.mNeedDrain) {
639 aReadState.mNeedDrain = false;
640 DrainInputStream(aReadState, aEvents);
644 void nsPipe::AdvanceReadCursor(nsPipeReadState& aReadState,
645 uint32_t aBytesRead) {
646 MOZ_DIAGNOSTIC_ASSERT(aBytesRead > 0);
648 nsPipeEvents events;
650 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
652 LOG(("III advancing read cursor by %u\n", aBytesRead));
653 MOZ_DIAGNOSTIC_ASSERT(aBytesRead <= mBuffer.GetSegmentSize());
655 aReadState.mReadCursor += aBytesRead;
656 MOZ_DIAGNOSTIC_ASSERT(aReadState.mReadCursor <= aReadState.mReadLimit);
658 MOZ_DIAGNOSTIC_ASSERT(aReadState.mAvailable >= aBytesRead);
659 aReadState.mAvailable -= aBytesRead;
661 // Check to see if we're at the end of the available read data. If we
662 // are, and this segment is not still being written, then we can possibly
663 // free up the segment.
664 if (aReadState.mReadCursor == aReadState.mReadLimit &&
665 !ReadSegmentBeingWritten(aReadState)) {
666 // Advance the segment position. If we have read any segments from the
667 // advance buffer then we can potentially notify blocked writers.
668 mOutput.Monitor().AssertCurrentThreadIn();
669 if (AdvanceReadSegment(aReadState, mon) == SegmentAdvanceBufferRead &&
670 mOutput.OnOutputWritable(events) == NotifyMonitor) {
671 mon.NotifyAll();
675 ReleaseReadSegment(aReadState, events);
679 SegmentChangeResult nsPipe::AdvanceReadSegment(
680 nsPipeReadState& aReadState, const ReentrantMonitorAutoEnter& ev) {
681 // Calculate how many segments are buffered for this stream to start.
682 uint32_t startBufferSegments = GetBufferSegmentCount(aReadState, ev);
684 int32_t currentSegment = aReadState.mSegment;
686 // Move to the next segment to read
687 aReadState.mSegment += 1;
689 // If this was the last reference to the first segment, then remove it.
690 if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) {
691 // shift write and read segment index (-1 indicates an empty buffer).
692 mWriteSegment -= 1;
694 // Directly modify the current read state. If the associated input
695 // stream is closed simultaneous with reading, then it may not be
696 // in the mInputList any more.
697 aReadState.mSegment -= 1;
699 for (uint32_t i = 0; i < mInputList.Length(); ++i) {
700 // Skip the current read state structure since we modify it manually
701 // before entering this loop.
702 if (&mInputList[i]->ReadState() == &aReadState) {
703 continue;
705 mInputList[i]->ReadState().mSegment -= 1;
708 // done with this segment
709 mBuffer.DeleteFirstSegment();
710 LOG(("III deleting first segment\n"));
713 if (mWriteSegment < aReadState.mSegment) {
714 // read cursor has hit the end of written data, so reset it
715 MOZ_DIAGNOSTIC_ASSERT(mWriteSegment == (aReadState.mSegment - 1));
716 aReadState.mReadCursor = nullptr;
717 aReadState.mReadLimit = nullptr;
718 // also, the buffer is completely empty, so reset the write cursor
719 if (mWriteSegment == -1) {
720 mWriteCursor = nullptr;
721 mWriteLimit = nullptr;
723 } else {
724 // advance read cursor and limit to next buffer segment
725 aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment);
726 if (mWriteSegment == aReadState.mSegment) {
727 aReadState.mReadLimit = mWriteCursor;
728 } else {
729 aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize();
733 // Calculate how many segments are buffered for the stream after
734 // reading.
735 uint32_t endBufferSegments = GetBufferSegmentCount(aReadState, ev);
737 // If the stream has read a segment out of the set of advanced buffer
738 // segments, then the writer may advance.
739 if (startBufferSegments >= mMaxAdvanceBufferSegmentCount &&
740 endBufferSegments < mMaxAdvanceBufferSegmentCount) {
741 return SegmentAdvanceBufferRead;
744 // Otherwise there are no significant changes to the segment structure.
745 return SegmentNotChanged;
748 void nsPipe::DrainInputStream(nsPipeReadState& aReadState,
749 nsPipeEvents& aEvents) {
750 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
752 // If a segment is actively being read in ReadSegments() for this input
753 // stream, then we cannot drain the stream. This can happen because
754 // ReadSegments() does not hold the lock while copying from the buffer.
755 // If we detect this condition, simply note that we need a drain once
756 // the read completes and return immediately.
757 if (aReadState.mActiveRead) {
758 MOZ_DIAGNOSTIC_ASSERT(!aReadState.mNeedDrain);
759 aReadState.mNeedDrain = true;
760 return;
763 while (mWriteSegment >= aReadState.mSegment) {
764 // If the last segment to free is still being written to, we're done
765 // draining. We can't free any more.
766 if (ReadSegmentBeingWritten(aReadState)) {
767 break;
770 // Don't bother checking if this results in an advance buffer segment
771 // read. Since we are draining the entire stream we will read an
772 // advance buffer segment no matter what.
773 AdvanceReadSegment(aReadState, mon);
776 // Force the stream into an empty state. Make sure mAvailable, mCursor, and
777 // mReadLimit are consistent with one another.
778 aReadState.mAvailable = 0;
779 aReadState.mReadCursor = nullptr;
780 aReadState.mReadLimit = nullptr;
782 // Remove the input stream from the pipe's list of streams. This will
783 // prevent the pipe from holding the stream alive or trying to update
784 // its read state any further.
785 DebugOnly<uint32_t> numRemoved = 0;
786 mInputList.RemoveElementsBy([&](nsPipeInputStream* aEntry) {
787 bool result = &aReadState == &aEntry->ReadState();
788 numRemoved += result ? 1 : 0;
789 return result;
791 MOZ_ASSERT(numRemoved == 1);
793 // If we have read any segments from the advance buffer then we can
794 // potentially notify blocked writers.
795 mOutput.Monitor().AssertCurrentThreadIn();
796 if (!IsAdvanceBufferFull(mon) &&
797 mOutput.OnOutputWritable(aEvents) == NotifyMonitor) {
798 mon.NotifyAll();
802 bool nsPipe::ReadSegmentBeingWritten(nsPipeReadState& aReadState) {
803 mReentrantMonitor.AssertCurrentThreadIn();
804 bool beingWritten =
805 mWriteSegment == aReadState.mSegment && mWriteLimit > mWriteCursor;
806 MOZ_DIAGNOSTIC_ASSERT(!beingWritten || aReadState.mReadLimit == mWriteCursor);
807 return beingWritten;
810 nsresult nsPipe::GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen) {
811 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
813 if (NS_FAILED(mStatus)) {
814 return mStatus;
817 // write cursor and limit may both be null indicating an empty buffer.
818 if (mWriteCursor == mWriteLimit) {
819 // The pipe is full if we have hit our limit on advance data buffering.
820 // This means the fastest reader is still reading slower than data is
821 // being written into the pipe.
822 if (IsAdvanceBufferFull(mon)) {
823 return NS_BASE_STREAM_WOULD_BLOCK;
826 // The nsSegmentedBuffer is configured to be "infinite", so this
827 // should never return nullptr here.
828 char* seg = mBuffer.AppendNewSegment();
829 if (!seg) {
830 return NS_ERROR_OUT_OF_MEMORY;
833 LOG(("OOO appended new segment\n"));
834 mWriteCursor = seg;
835 mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize();
836 ++mWriteSegment;
839 // make sure read cursor is initialized
840 SetAllNullReadCursors();
842 // check to see if we can roll-back our read and write cursors to the
843 // beginning of the current/first segment. this is purely an optimization.
844 if (mWriteSegment == 0 && AllReadCursorsMatchWriteCursor()) {
845 char* head = mBuffer.GetSegment(0);
846 LOG(("OOO rolling back write cursor %" PRId64 " bytes\n",
847 static_cast<int64_t>(mWriteCursor - head)));
848 RollBackAllReadCursors(head);
849 mWriteCursor = head;
852 aSegment = mWriteCursor;
853 aSegmentLen = mWriteLimit - mWriteCursor;
854 return NS_OK;
857 void nsPipe::AdvanceWriteCursor(uint32_t aBytesWritten) {
858 MOZ_DIAGNOSTIC_ASSERT(aBytesWritten > 0);
860 nsPipeEvents events;
862 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
864 LOG(("OOO advancing write cursor by %u\n", aBytesWritten));
866 char* newWriteCursor = mWriteCursor + aBytesWritten;
867 MOZ_DIAGNOSTIC_ASSERT(newWriteCursor <= mWriteLimit);
869 // update read limit if reading in the same segment
870 UpdateAllReadCursors(newWriteCursor);
872 mWriteCursor = newWriteCursor;
874 ValidateAllReadCursors();
876 // update the writable flag on the output stream
877 if (mWriteCursor == mWriteLimit) {
878 mOutput.Monitor().AssertCurrentThreadIn();
879 mOutput.SetWritable(!IsAdvanceBufferFull(mon));
882 // notify input stream that pipe now contains additional data
883 bool needNotify = false;
884 for (uint32_t i = 0; i < mInputList.Length(); ++i) {
885 mInputList[i]->Monitor().AssertCurrentThreadIn();
886 if (mInputList[i]->OnInputReadable(aBytesWritten, events, mon) ==
887 NotifyMonitor) {
888 needNotify = true;
892 if (needNotify) {
893 mon.NotifyAll();
898 void nsPipe::OnInputStreamException(nsPipeInputStream* aStream,
899 nsresult aReason) {
900 MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason));
902 nsPipeEvents events;
904 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
906 // Its possible to re-enter this method when we call OnPipeException() or
907 // OnInputExection() below. If there is a caller stuck in our synchronous
908 // Wait() method, then they will get woken up with a failure code which
909 // re-enters this method. Therefore, gracefully handle unknown streams
910 // here.
912 // If we only have one stream open and it is the given stream, then shut
913 // down the entire pipe.
914 if (mInputList.Length() == 1) {
915 if (mInputList[0] == aStream) {
916 OnPipeException(aReason);
918 return;
921 // Otherwise just close the particular stream that hit an exception.
922 for (uint32_t i = 0; i < mInputList.Length(); ++i) {
923 if (mInputList[i] != aStream) {
924 continue;
927 mInputList[i]->Monitor().AssertCurrentThreadIn();
928 MonitorAction action =
929 mInputList[i]->OnInputException(aReason, events, mon);
931 // Notify after element is removed in case we re-enter as a result.
932 if (action == NotifyMonitor) {
933 mon.NotifyAll();
936 return;
941 void nsPipe::OnPipeException(nsresult aReason, bool aOutputOnly) {
942 LOG(("PPP nsPipe::OnPipeException [reason=%" PRIx32 " output-only=%d]\n",
943 static_cast<uint32_t>(aReason), aOutputOnly));
945 nsPipeEvents events;
947 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
949 // if we've already hit an exception, then ignore this one.
950 if (NS_FAILED(mStatus)) {
951 return;
954 mStatus = aReason;
956 bool needNotify = false;
958 // OnInputException() can drain the stream and remove it from
959 // mInputList. So iterate over a temp list instead.
960 nsTArray<nsPipeInputStream*> list = mInputList.Clone();
961 for (uint32_t i = 0; i < list.Length(); ++i) {
962 // an output-only exception applies to the input end if the pipe has
963 // zero bytes available.
964 list[i]->Monitor().AssertCurrentThreadIn();
965 if (aOutputOnly && list[i]->Available()) {
966 continue;
969 if (list[i]->OnInputException(aReason, events, mon) == NotifyMonitor) {
970 needNotify = true;
974 mOutput.Monitor().AssertCurrentThreadIn();
975 if (mOutput.OnOutputException(aReason, events) == NotifyMonitor) {
976 needNotify = true;
979 // Notify after we have removed any input streams from mInputList
980 if (needNotify) {
981 mon.NotifyAll();
986 nsresult nsPipe::CloneInputStream(nsPipeInputStream* aOriginal,
987 nsIInputStream** aCloneOut) {
988 ReentrantMonitorAutoEnter mon(mReentrantMonitor);
989 RefPtr<nsPipeInputStream> ref = new nsPipeInputStream(*aOriginal);
990 // don't add clones of closed pipes to mInputList.
991 ref->Monitor().AssertCurrentThreadIn();
992 if (NS_SUCCEEDED(ref->InputStatus(mon))) {
993 mInputList.AppendElement(ref);
995 nsCOMPtr<nsIAsyncInputStream> upcast = std::move(ref);
996 upcast.forget(aCloneOut);
997 return NS_OK;
1000 uint32_t nsPipe::CountSegmentReferences(int32_t aSegment) {
1001 mReentrantMonitor.AssertCurrentThreadIn();
1002 uint32_t count = 0;
1003 for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1004 if (aSegment >= mInputList[i]->ReadState().mSegment) {
1005 count += 1;
1008 return count;
1011 void nsPipe::SetAllNullReadCursors() {
1012 mReentrantMonitor.AssertCurrentThreadIn();
1013 for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1014 nsPipeReadState& readState = mInputList[i]->ReadState();
1015 if (!readState.mReadCursor) {
1016 MOZ_DIAGNOSTIC_ASSERT(mWriteSegment == readState.mSegment);
1017 readState.mReadCursor = readState.mReadLimit = mWriteCursor;
1022 bool nsPipe::AllReadCursorsMatchWriteCursor() {
1023 mReentrantMonitor.AssertCurrentThreadIn();
1024 for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1025 const nsPipeReadState& readState = mInputList[i]->ReadState();
1026 if (readState.mSegment != mWriteSegment ||
1027 readState.mReadCursor != mWriteCursor) {
1028 return false;
1031 return true;
1034 void nsPipe::RollBackAllReadCursors(char* aWriteCursor) {
1035 mReentrantMonitor.AssertCurrentThreadIn();
1036 for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1037 nsPipeReadState& readState = mInputList[i]->ReadState();
1038 MOZ_DIAGNOSTIC_ASSERT(mWriteSegment == readState.mSegment);
1039 MOZ_DIAGNOSTIC_ASSERT(mWriteCursor == readState.mReadCursor);
1040 MOZ_DIAGNOSTIC_ASSERT(mWriteCursor == readState.mReadLimit);
1041 readState.mReadCursor = aWriteCursor;
1042 readState.mReadLimit = aWriteCursor;
1046 void nsPipe::UpdateAllReadCursors(char* aWriteCursor) {
1047 mReentrantMonitor.AssertCurrentThreadIn();
1048 for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1049 nsPipeReadState& readState = mInputList[i]->ReadState();
1050 if (mWriteSegment == readState.mSegment &&
1051 readState.mReadLimit == mWriteCursor) {
1052 readState.mReadLimit = aWriteCursor;
1057 void nsPipe::ValidateAllReadCursors() {
1058 mReentrantMonitor.AssertCurrentThreadIn();
1059 // The only way mReadCursor == mWriteCursor is if:
1061 // - mReadCursor is at the start of a segment (which, based on how
1062 // nsSegmentedBuffer works, means that this segment is the "first"
1063 // segment)
1064 // - mWriteCursor points at the location past the end of the current
1065 // write segment (so the current write filled the current write
1066 // segment, so we've incremented mWriteCursor to point past the end
1067 // of it)
1068 // - the segment to which data has just been written is located
1069 // exactly one segment's worth of bytes before the first segment
1070 // where mReadCursor is located
1072 // Consequently, the byte immediately after the end of the current
1073 // write segment is the first byte of the first segment, so
1074 // mReadCursor == mWriteCursor. (Another way to think about this is
1075 // to consider the buffer architecture diagram above, but consider it
1076 // with an arena allocator which allocates from the *end* of the
1077 // arena to the *beginning* of the arena.)
1078 #ifdef DEBUG
1079 for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1080 const nsPipeReadState& state = mInputList[i]->ReadState();
1081 MOZ_ASSERT(state.mReadCursor != mWriteCursor ||
1082 (mBuffer.GetSegment(state.mSegment) == state.mReadCursor &&
1083 mWriteCursor == mWriteLimit));
1085 #endif
1088 uint32_t nsPipe::GetBufferSegmentCount(
1089 const nsPipeReadState& aReadState,
1090 const ReentrantMonitorAutoEnter& ev) const {
1091 // The write segment can be smaller than the current reader position
1092 // in some cases. For example, when the first write segment has not
1093 // been allocated yet mWriteSegment is negative. In these cases
1094 // the stream is effectively using zero segments.
1095 if (mWriteSegment < aReadState.mSegment) {
1096 return 0;
1099 MOZ_DIAGNOSTIC_ASSERT(mWriteSegment >= 0);
1100 MOZ_DIAGNOSTIC_ASSERT(aReadState.mSegment >= 0);
1102 // Otherwise at least one segment is being used. We add one here
1103 // since a single segment is being used when the write and read
1104 // segment indices are the same.
1105 return 1 + mWriteSegment - aReadState.mSegment;
1108 bool nsPipe::IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const {
1109 // If we have fewer total segments than the limit we can immediately
1110 // determine we are not full. Note, we must add one to mWriteSegment
1111 // to convert from a index to a count.
1112 MOZ_DIAGNOSTIC_ASSERT(mWriteSegment >= -1);
1113 MOZ_DIAGNOSTIC_ASSERT(mWriteSegment < INT32_MAX);
1114 uint32_t totalWriteSegments = mWriteSegment + 1;
1115 if (totalWriteSegments < mMaxAdvanceBufferSegmentCount) {
1116 return false;
1119 // Otherwise we must inspect all of our reader streams. We need
1120 // to determine the buffer depth of the fastest reader.
1121 uint32_t minBufferSegments = UINT32_MAX;
1122 for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1123 // Only count buffer segments from input streams that are open.
1124 mInputList[i]->Monitor().AssertCurrentThreadIn();
1125 if (NS_FAILED(mInputList[i]->Status(ev))) {
1126 continue;
1128 const nsPipeReadState& state = mInputList[i]->ReadState();
1129 uint32_t bufferSegments = GetBufferSegmentCount(state, ev);
1130 minBufferSegments = std::min(minBufferSegments, bufferSegments);
1131 // We only care if any reader has fewer segments buffered than
1132 // our threshold. We can stop once we hit that threshold.
1133 if (minBufferSegments < mMaxAdvanceBufferSegmentCount) {
1134 return false;
1138 // Note, its possible for minBufferSegments to exceed our
1139 // mMaxAdvanceBufferSegmentCount here. This happens when a cloned
1140 // reader gets far behind, but then the fastest reader stream is
1141 // closed. This leaves us with a single stream that is buffered
1142 // beyond our max. Naturally we continue to indicate the pipe
1143 // is full at this point.
1145 return true;
1148 //-----------------------------------------------------------------------------
1149 // nsPipeEvents methods:
1150 //-----------------------------------------------------------------------------
1152 nsPipeEvents::~nsPipeEvents() {
1153 // dispatch any pending events
1154 for (auto& callback : mCallbacks) {
1155 callback.Notify();
1157 mCallbacks.Clear();
1160 //-----------------------------------------------------------------------------
1161 // nsPipeInputStream methods:
1162 //-----------------------------------------------------------------------------
1164 NS_IMPL_ADDREF(nsPipeInputStream);
1165 NS_IMPL_RELEASE(nsPipeInputStream);
1167 NS_INTERFACE_TABLE_HEAD(nsPipeInputStream)
1168 NS_INTERFACE_TABLE_BEGIN
1169 NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIAsyncInputStream)
1170 NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsITellableStream)
1171 NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsISearchableInputStream)
1172 NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsICloneableInputStream)
1173 NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIBufferedInputStream)
1174 NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIClassInfo)
1175 NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIInputStreamPriority)
1176 NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsPipeInputStream, nsIInputStream,
1177 nsIAsyncInputStream)
1178 NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsPipeInputStream, nsISupports,
1179 nsIAsyncInputStream)
1180 NS_INTERFACE_TABLE_END
1181 NS_INTERFACE_TABLE_TAIL
1183 NS_IMPL_CI_INTERFACE_GETTER(nsPipeInputStream, nsIInputStream,
1184 nsIAsyncInputStream, nsITellableStream,
1185 nsISearchableInputStream, nsICloneableInputStream,
1186 nsIBufferedInputStream)
1188 NS_IMPL_THREADSAFE_CI(nsPipeInputStream)
1190 NS_IMETHODIMP
1191 nsPipeInputStream::Init(nsIInputStream*, uint32_t) {
1192 MOZ_CRASH(
1193 "nsPipeInputStream should never be initialized with "
1194 "nsIBufferedInputStream::Init!\n");
1197 NS_IMETHODIMP
1198 nsPipeInputStream::GetData(nsIInputStream** aResult) {
1199 // as this was not created with init() we are not
1200 // wrapping anything
1201 return NS_ERROR_NOT_IMPLEMENTED;
1204 uint32_t nsPipeInputStream::Available() {
1205 mPipe->mReentrantMonitor.AssertCurrentThreadIn();
1206 return mReadState.mAvailable;
1209 nsresult nsPipeInputStream::Wait() {
1210 MOZ_DIAGNOSTIC_ASSERT(mBlocking);
1212 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1214 while (NS_SUCCEEDED(Status(mon)) && (mReadState.mAvailable == 0)) {
1215 LOG(("III pipe input: waiting for data\n"));
1217 mBlocked = true;
1218 mon.Wait();
1219 mBlocked = false;
1221 LOG(("III pipe input: woke up [status=%" PRIx32 " available=%u]\n",
1222 static_cast<uint32_t>(Status(mon)), mReadState.mAvailable));
1225 return Status(mon) == NS_BASE_STREAM_CLOSED ? NS_OK : Status(mon);
1228 MonitorAction nsPipeInputStream::OnInputReadable(
1229 uint32_t aBytesWritten, nsPipeEvents& aEvents,
1230 const ReentrantMonitorAutoEnter& ev) {
1231 MonitorAction result = DoNotNotifyMonitor;
1233 mPipe->mReentrantMonitor.AssertCurrentThreadIn();
1234 mReadState.mAvailable += aBytesWritten;
1236 if (mCallback && !(mCallback.Flags() & WAIT_CLOSURE_ONLY)) {
1237 aEvents.NotifyReady(std::move(mCallback));
1238 } else if (mBlocked) {
1239 result = NotifyMonitor;
1242 return result;
1245 MonitorAction nsPipeInputStream::OnInputException(
1246 nsresult aReason, nsPipeEvents& aEvents,
1247 const ReentrantMonitorAutoEnter& ev) {
1248 LOG(("nsPipeInputStream::OnInputException [this=%p reason=%" PRIx32 "]\n",
1249 this, static_cast<uint32_t>(aReason)));
1251 MonitorAction result = DoNotNotifyMonitor;
1253 MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason));
1255 if (NS_SUCCEEDED(mInputStatus)) {
1256 mInputStatus = aReason;
1259 // force count of available bytes to zero.
1260 mPipe->DrainInputStream(mReadState, aEvents);
1262 if (mCallback) {
1263 aEvents.NotifyReady(std::move(mCallback));
1264 } else if (mBlocked) {
1265 result = NotifyMonitor;
1268 return result;
1271 NS_IMETHODIMP
1272 nsPipeInputStream::CloseWithStatus(nsresult aReason) {
1273 LOG(("III CloseWithStatus [this=%p reason=%" PRIx32 "]\n", this,
1274 static_cast<uint32_t>(aReason)));
1276 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1278 if (NS_FAILED(mInputStatus)) {
1279 return NS_OK;
1282 if (NS_SUCCEEDED(aReason)) {
1283 aReason = NS_BASE_STREAM_CLOSED;
1286 mPipe->OnInputStreamException(this, aReason);
1287 return NS_OK;
1290 NS_IMETHODIMP
1291 nsPipeInputStream::SetPriority(uint32_t priority) {
1292 mPriority = priority;
1293 return NS_OK;
1296 NS_IMETHODIMP
1297 nsPipeInputStream::GetPriority(uint32_t* priority) {
1298 *priority = mPriority;
1299 return NS_OK;
1302 NS_IMETHODIMP
1303 nsPipeInputStream::Close() { return CloseWithStatus(NS_BASE_STREAM_CLOSED); }
1305 NS_IMETHODIMP
1306 nsPipeInputStream::Available(uint64_t* aResult) {
1307 // nsPipeInputStream supports under 4GB stream only
1308 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1310 // return error if closed
1311 if (!mReadState.mAvailable && NS_FAILED(Status(mon))) {
1312 return Status(mon);
1315 *aResult = (uint64_t)mReadState.mAvailable;
1316 return NS_OK;
1319 NS_IMETHODIMP
1320 nsPipeInputStream::StreamStatus() {
1321 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1322 return mReadState.mAvailable ? NS_OK : Status(mon);
1325 NS_IMETHODIMP
1326 nsPipeInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
1327 uint32_t aCount, uint32_t* aReadCount) {
1328 LOG(("III ReadSegments [this=%p count=%u]\n", this, aCount));
1330 nsresult rv = NS_OK;
1332 *aReadCount = 0;
1333 while (aCount) {
1334 AutoReadSegment segment(mPipe, mReadState, aCount);
1335 rv = segment.Status();
1336 if (NS_FAILED(rv)) {
1337 // ignore this error if we've already read something.
1338 if (*aReadCount > 0) {
1339 rv = NS_OK;
1340 break;
1342 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
1343 // pipe is empty
1344 if (!mBlocking) {
1345 break;
1347 // wait for some data to be written to the pipe
1348 rv = Wait();
1349 if (NS_SUCCEEDED(rv)) {
1350 continue;
1353 // ignore this error, just return.
1354 if (rv == NS_BASE_STREAM_CLOSED) {
1355 rv = NS_OK;
1356 break;
1358 mPipe->OnInputStreamException(this, rv);
1359 break;
1362 uint32_t writeCount;
1363 while (segment.Length()) {
1364 writeCount = 0;
1366 rv = aWriter(static_cast<nsIAsyncInputStream*>(this), aClosure,
1367 segment.Data(), *aReadCount, segment.Length(), &writeCount);
1369 if (NS_FAILED(rv) || writeCount == 0) {
1370 aCount = 0;
1371 // any errors returned from the writer end here: do not
1372 // propagate to the caller of ReadSegments.
1373 rv = NS_OK;
1374 break;
1377 MOZ_DIAGNOSTIC_ASSERT(writeCount <= segment.Length());
1378 segment.Advance(writeCount);
1379 aCount -= writeCount;
1380 *aReadCount += writeCount;
1381 mLogicalOffset += writeCount;
1385 return rv;
1388 NS_IMETHODIMP
1389 nsPipeInputStream::Read(char* aToBuf, uint32_t aBufLen, uint32_t* aReadCount) {
1390 return ReadSegments(NS_CopySegmentToBuffer, aToBuf, aBufLen, aReadCount);
1393 NS_IMETHODIMP
1394 nsPipeInputStream::IsNonBlocking(bool* aNonBlocking) {
1395 *aNonBlocking = !mBlocking;
1396 return NS_OK;
1399 NS_IMETHODIMP
1400 nsPipeInputStream::AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags,
1401 uint32_t aRequestedCount,
1402 nsIEventTarget* aTarget) {
1403 LOG(("III AsyncWait [this=%p]\n", this));
1405 nsPipeEvents pipeEvents;
1407 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1409 // replace a pending callback
1410 mCallback = nullptr;
1412 if (!aCallback) {
1413 return NS_OK;
1416 CallbackHolder callback(this, aCallback, aFlags, aTarget);
1418 if (NS_FAILED(Status(mon)) ||
1419 (mReadState.mAvailable && !(aFlags & WAIT_CLOSURE_ONLY))) {
1420 // stream is already closed or readable; post event.
1421 pipeEvents.NotifyReady(std::move(callback));
1422 } else {
1423 // queue up callback object to be notified when data becomes available
1424 mCallback = std::move(callback);
1427 return NS_OK;
1430 NS_IMETHODIMP
1431 nsPipeInputStream::Tell(int64_t* aOffset) {
1432 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1434 // return error if closed
1435 if (!mReadState.mAvailable && NS_FAILED(Status(mon))) {
1436 return Status(mon);
1439 *aOffset = mLogicalOffset;
1440 return NS_OK;
1443 static bool strings_equal(bool aIgnoreCase, const char* aS1, const char* aS2,
1444 uint32_t aLen) {
1445 return aIgnoreCase ? !nsCRT::strncasecmp(aS1, aS2, aLen)
1446 : !strncmp(aS1, aS2, aLen);
1449 NS_IMETHODIMP
1450 nsPipeInputStream::Search(const char* aForString, bool aIgnoreCase,
1451 bool* aFound, uint32_t* aOffsetSearchedTo) {
1452 LOG(("III Search [for=%s ic=%u]\n", aForString, aIgnoreCase));
1454 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1456 char* cursor1;
1457 char* limit1;
1458 uint32_t index = 0, offset = 0;
1459 uint32_t strLen = strlen(aForString);
1461 mPipe->PeekSegment(mReadState, 0, cursor1, limit1);
1462 if (cursor1 == limit1) {
1463 *aFound = false;
1464 *aOffsetSearchedTo = 0;
1465 LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
1466 return NS_OK;
1469 while (true) {
1470 uint32_t i, len1 = limit1 - cursor1;
1472 // check if the string is in the buffer segment
1473 for (i = 0; i < len1 - strLen + 1; i++) {
1474 if (strings_equal(aIgnoreCase, &cursor1[i], aForString, strLen)) {
1475 *aFound = true;
1476 *aOffsetSearchedTo = offset + i;
1477 LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
1478 return NS_OK;
1482 // get the next segment
1483 char* cursor2;
1484 char* limit2;
1485 uint32_t len2;
1487 index++;
1488 offset += len1;
1490 mPipe->PeekSegment(mReadState, index, cursor2, limit2);
1491 if (cursor2 == limit2) {
1492 *aFound = false;
1493 *aOffsetSearchedTo = offset - strLen + 1;
1494 LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
1495 return NS_OK;
1497 len2 = limit2 - cursor2;
1499 // check if the string is straddling the next buffer segment
1500 uint32_t lim = XPCOM_MIN(strLen, len2 + 1);
1501 for (i = 0; i < lim; ++i) {
1502 uint32_t strPart1Len = strLen - i - 1;
1503 uint32_t strPart2Len = strLen - strPart1Len;
1504 const char* strPart2 = &aForString[strLen - strPart2Len];
1505 uint32_t bufSeg1Offset = len1 - strPart1Len;
1506 if (strings_equal(aIgnoreCase, &cursor1[bufSeg1Offset], aForString,
1507 strPart1Len) &&
1508 strings_equal(aIgnoreCase, cursor2, strPart2, strPart2Len)) {
1509 *aFound = true;
1510 *aOffsetSearchedTo = offset - strPart1Len;
1511 LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
1512 return NS_OK;
1516 // finally continue with the next buffer
1517 cursor1 = cursor2;
1518 limit1 = limit2;
1521 MOZ_ASSERT_UNREACHABLE("can't get here");
1522 return NS_ERROR_UNEXPECTED; // keep compiler happy
1525 NS_IMETHODIMP
1526 nsPipeInputStream::GetCloneable(bool* aCloneableOut) {
1527 *aCloneableOut = true;
1528 return NS_OK;
1531 NS_IMETHODIMP
1532 nsPipeInputStream::Clone(nsIInputStream** aCloneOut) {
1533 return mPipe->CloneInputStream(this, aCloneOut);
1536 nsresult nsPipeInputStream::Status(const ReentrantMonitorAutoEnter& ev) const {
1537 if (NS_FAILED(mInputStatus)) {
1538 return mInputStatus;
1541 if (mReadState.mAvailable) {
1542 // Still something to read and this input stream state is OK.
1543 return NS_OK;
1546 // Nothing to read, just fall through to the pipe's state that
1547 // may reflect state of its output stream side (already closed).
1548 return mPipe->mStatus;
1551 nsresult nsPipeInputStream::Status() const {
1552 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1553 return Status(mon);
1556 nsPipeInputStream::~nsPipeInputStream() { Close(); }
1558 //-----------------------------------------------------------------------------
1559 // nsPipeOutputStream methods:
1560 //-----------------------------------------------------------------------------
1562 NS_IMPL_QUERY_INTERFACE(nsPipeOutputStream, nsIOutputStream,
1563 nsIAsyncOutputStream, nsIClassInfo)
1565 NS_IMPL_CI_INTERFACE_GETTER(nsPipeOutputStream, nsIOutputStream,
1566 nsIAsyncOutputStream)
1568 NS_IMPL_THREADSAFE_CI(nsPipeOutputStream)
1570 nsresult nsPipeOutputStream::Wait() {
1571 MOZ_DIAGNOSTIC_ASSERT(mBlocking);
1573 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1575 if (NS_SUCCEEDED(mPipe->mStatus) && !mWritable) {
1576 LOG(("OOO pipe output: waiting for space\n"));
1577 mBlocked = true;
1578 mon.Wait();
1579 mBlocked = false;
1580 LOG(("OOO pipe output: woke up [pipe-status=%" PRIx32 " writable=%u]\n",
1581 static_cast<uint32_t>(mPipe->mStatus), mWritable));
1584 return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
1587 MonitorAction nsPipeOutputStream::OnOutputWritable(nsPipeEvents& aEvents) {
1588 MonitorAction result = DoNotNotifyMonitor;
1590 mWritable = true;
1592 if (mCallback && !(mCallback.Flags() & WAIT_CLOSURE_ONLY)) {
1593 aEvents.NotifyReady(std::move(mCallback));
1594 } else if (mBlocked) {
1595 result = NotifyMonitor;
1598 return result;
1601 MonitorAction nsPipeOutputStream::OnOutputException(nsresult aReason,
1602 nsPipeEvents& aEvents) {
1603 LOG(("nsPipeOutputStream::OnOutputException [this=%p reason=%" PRIx32 "]\n",
1604 this, static_cast<uint32_t>(aReason)));
1606 MonitorAction result = DoNotNotifyMonitor;
1608 MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason));
1609 mWritable = false;
1611 if (mCallback) {
1612 aEvents.NotifyReady(std::move(mCallback));
1613 } else if (mBlocked) {
1614 result = NotifyMonitor;
1617 return result;
1620 NS_IMETHODIMP_(MozExternalRefCountType)
1621 nsPipeOutputStream::AddRef() {
1622 ++mWriterRefCnt;
1623 return mPipe->AddRef();
1626 NS_IMETHODIMP_(MozExternalRefCountType)
1627 nsPipeOutputStream::Release() {
1628 if (--mWriterRefCnt == 0) {
1629 Close();
1631 return mPipe->Release();
1634 NS_IMETHODIMP
1635 nsPipeOutputStream::CloseWithStatus(nsresult aReason) {
1636 LOG(("OOO CloseWithStatus [this=%p reason=%" PRIx32 "]\n", this,
1637 static_cast<uint32_t>(aReason)));
1639 if (NS_SUCCEEDED(aReason)) {
1640 aReason = NS_BASE_STREAM_CLOSED;
1643 // input stream may remain open
1644 mPipe->OnPipeException(aReason, true);
1645 return NS_OK;
1648 NS_IMETHODIMP
1649 nsPipeOutputStream::Close() { return CloseWithStatus(NS_BASE_STREAM_CLOSED); }
1651 NS_IMETHODIMP
1652 nsPipeOutputStream::WriteSegments(nsReadSegmentFun aReader, void* aClosure,
1653 uint32_t aCount, uint32_t* aWriteCount) {
1654 LOG(("OOO WriteSegments [this=%p count=%u]\n", this, aCount));
1656 nsresult rv = NS_OK;
1658 char* segment;
1659 uint32_t segmentLen;
1661 *aWriteCount = 0;
1662 while (aCount) {
1663 rv = mPipe->GetWriteSegment(segment, segmentLen);
1664 if (NS_FAILED(rv)) {
1665 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
1666 // pipe is full
1667 if (!mBlocking) {
1668 // ignore this error if we've already written something
1669 if (*aWriteCount > 0) {
1670 rv = NS_OK;
1672 break;
1674 // wait for the pipe to have an empty segment.
1675 rv = Wait();
1676 if (NS_SUCCEEDED(rv)) {
1677 continue;
1680 mPipe->OnPipeException(rv);
1681 break;
1684 // write no more than aCount
1685 if (segmentLen > aCount) {
1686 segmentLen = aCount;
1689 uint32_t readCount, originalLen = segmentLen;
1690 while (segmentLen) {
1691 readCount = 0;
1693 rv = aReader(this, aClosure, segment, *aWriteCount, segmentLen,
1694 &readCount);
1696 if (NS_FAILED(rv) || readCount == 0) {
1697 aCount = 0;
1698 // any errors returned from the aReader end here: do not
1699 // propagate to the caller of WriteSegments.
1700 rv = NS_OK;
1701 break;
1704 MOZ_DIAGNOSTIC_ASSERT(readCount <= segmentLen);
1705 segment += readCount;
1706 segmentLen -= readCount;
1707 aCount -= readCount;
1708 *aWriteCount += readCount;
1709 mLogicalOffset += readCount;
1712 if (segmentLen < originalLen) {
1713 mPipe->AdvanceWriteCursor(originalLen - segmentLen);
1717 return rv;
1720 NS_IMETHODIMP
1721 nsPipeOutputStream::Write(const char* aFromBuf, uint32_t aBufLen,
1722 uint32_t* aWriteCount) {
1723 return WriteSegments(NS_CopyBufferToSegment, (void*)aFromBuf, aBufLen,
1724 aWriteCount);
1727 NS_IMETHODIMP
1728 nsPipeOutputStream::Flush() {
1729 // nothing to do
1730 return NS_OK;
1733 NS_IMETHODIMP
1734 nsPipeOutputStream::StreamStatus() {
1735 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1736 return mPipe->mStatus;
1739 NS_IMETHODIMP
1740 nsPipeOutputStream::WriteFrom(nsIInputStream* aFromStream, uint32_t aCount,
1741 uint32_t* aWriteCount) {
1742 return WriteSegments(NS_CopyStreamToSegment, aFromStream, aCount,
1743 aWriteCount);
1746 NS_IMETHODIMP
1747 nsPipeOutputStream::IsNonBlocking(bool* aNonBlocking) {
1748 *aNonBlocking = !mBlocking;
1749 return NS_OK;
1752 NS_IMETHODIMP
1753 nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback* aCallback,
1754 uint32_t aFlags, uint32_t aRequestedCount,
1755 nsIEventTarget* aTarget) {
1756 LOG(("OOO AsyncWait [this=%p]\n", this));
1758 nsPipeEvents pipeEvents;
1760 ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1762 // replace a pending callback
1763 mCallback = nullptr;
1765 if (!aCallback) {
1766 return NS_OK;
1769 CallbackHolder callback(this, aCallback, aFlags, aTarget);
1771 if (NS_FAILED(mPipe->mStatus) ||
1772 (mWritable && !(aFlags & WAIT_CLOSURE_ONLY))) {
1773 // stream is already closed or writable; post event.
1774 pipeEvents.NotifyReady(std::move(callback));
1775 } else {
1776 // queue up callback object to be notified when data becomes available
1777 mCallback = std::move(callback);
1780 return NS_OK;
1783 ////////////////////////////////////////////////////////////////////////////////
1785 void NS_NewPipe(nsIInputStream** aPipeIn, nsIOutputStream** aPipeOut,
1786 uint32_t aSegmentSize, uint32_t aMaxSize,
1787 bool aNonBlockingInput, bool aNonBlockingOutput) {
1788 if (aSegmentSize == 0) {
1789 aSegmentSize = DEFAULT_SEGMENT_SIZE;
1792 // Handle aMaxSize of UINT32_MAX as a special case
1793 uint32_t segmentCount;
1794 if (aMaxSize == UINT32_MAX) {
1795 segmentCount = UINT32_MAX;
1796 } else {
1797 segmentCount = aMaxSize / aSegmentSize;
1800 nsIAsyncInputStream* in;
1801 nsIAsyncOutputStream* out;
1802 NS_NewPipe2(&in, &out, aNonBlockingInput, aNonBlockingOutput, aSegmentSize,
1803 segmentCount);
1805 *aPipeIn = in;
1806 *aPipeOut = out;
1809 // Disable thread safety analysis as this is logically a constructor, and no
1810 // additional threads can observe these objects yet.
1811 void NS_NewPipe2(nsIAsyncInputStream** aPipeIn, nsIAsyncOutputStream** aPipeOut,
1812 bool aNonBlockingInput, bool aNonBlockingOutput,
1813 uint32_t aSegmentSize,
1814 uint32_t aSegmentCount) MOZ_NO_THREAD_SAFETY_ANALYSIS {
1815 RefPtr<nsPipe> pipe =
1816 new nsPipe(aSegmentSize ? aSegmentSize : DEFAULT_SEGMENT_SIZE,
1817 aSegmentCount ? aSegmentCount : DEFAULT_SEGMENT_COUNT);
1819 RefPtr<nsPipeInputStream> pipeIn = new nsPipeInputStream(pipe);
1820 pipe->mInputList.AppendElement(pipeIn);
1821 RefPtr<nsPipeOutputStream> pipeOut = &pipe->mOutput;
1823 pipeIn->SetNonBlocking(aNonBlockingInput);
1824 pipeOut->SetNonBlocking(aNonBlockingOutput);
1826 pipeIn.forget(aPipeIn);
1827 pipeOut.forget(aPipeOut);
1830 ////////////////////////////////////////////////////////////////////////////////
1832 // Thin nsIPipe implementation for consumers of the component manager interface
1833 // for creating pipes. Acts as a thin wrapper around NS_NewPipe2 for JS callers.
1834 class nsPipeHolder final : public nsIPipe {
1835 public:
1836 NS_DECL_THREADSAFE_ISUPPORTS
1837 NS_DECL_NSIPIPE
1839 private:
1840 ~nsPipeHolder() = default;
1842 nsCOMPtr<nsIAsyncInputStream> mInput;
1843 nsCOMPtr<nsIAsyncOutputStream> mOutput;
1846 NS_IMPL_ISUPPORTS(nsPipeHolder, nsIPipe)
1848 NS_IMETHODIMP
1849 nsPipeHolder::Init(bool aNonBlockingInput, bool aNonBlockingOutput,
1850 uint32_t aSegmentSize, uint32_t aSegmentCount) {
1851 if (mInput || mOutput) {
1852 return NS_ERROR_ALREADY_INITIALIZED;
1854 NS_NewPipe2(getter_AddRefs(mInput), getter_AddRefs(mOutput),
1855 aNonBlockingInput, aNonBlockingOutput, aSegmentSize,
1856 aSegmentCount);
1857 return NS_OK;
1860 NS_IMETHODIMP
1861 nsPipeHolder::GetInputStream(nsIAsyncInputStream** aInputStream) {
1862 if (mInput) {
1863 *aInputStream = do_AddRef(mInput).take();
1864 return NS_OK;
1866 return NS_ERROR_NOT_INITIALIZED;
1869 NS_IMETHODIMP
1870 nsPipeHolder::GetOutputStream(nsIAsyncOutputStream** aOutputStream) {
1871 if (mOutput) {
1872 *aOutputStream = do_AddRef(mOutput).take();
1873 return NS_OK;
1875 return NS_ERROR_NOT_INITIALIZED;
1878 nsresult nsPipeConstructor(REFNSIID aIID, void** aResult) {
1879 RefPtr<nsPipeHolder> pipe = new nsPipeHolder();
1880 nsresult rv = pipe->QueryInterface(aIID, aResult);
1881 return rv;
1884 ////////////////////////////////////////////////////////////////////////////////