Bug 572417 - Release mouse capture in flash subclass after mouse events get delivered...
[mozilla-central.git] / xpcom / io / nsPipe3.cpp
blob686752dfca3fa7b93fa33acee10653272e51da83
1 /* ***** BEGIN LICENSE BLOCK *****
2 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
4 * The contents of this file are subject to the Mozilla Public License Version
5 * 1.1 (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 * http://www.mozilla.org/MPL/
9 * Software distributed under the License is distributed on an "AS IS" basis,
10 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
11 * for the specific language governing rights and limitations under the
12 * License.
14 * The Original Code is Mozilla.
16 * The Initial Developer of the Original Code is
17 * Netscape Communications Corporation.
18 * Portions created by the Initial Developer are Copyright (C) 2002
19 * the Initial Developer. All Rights Reserved.
21 * Contributor(s):
22 * Darin Fisher <darin@netscape.com>
24 * Alternatively, the contents of this file may be used under the terms of
25 * either the GNU General Public License Version 2 or later (the "GPL"), or
26 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
27 * in which case the provisions of the GPL or the LGPL are applicable instead
28 * of those above. If you wish to allow use of your version of this file only
29 * under the terms of either the GPL or the LGPL, and not to allow others to
30 * use your version of this file under the terms of the MPL, indicate your
31 * decision by deleting the provisions above and replace them with the notice
32 * and other provisions required by the GPL or the LGPL. If you do not delete
33 * the provisions above, a recipient may use your version of this file under
34 * the terms of any one of the MPL, the GPL or the LGPL.
36 * ***** END LICENSE BLOCK ***** */
38 #include "nsIPipe.h"
39 #include "nsIEventTarget.h"
40 #include "nsISeekableStream.h"
41 #include "nsIProgrammingLanguage.h"
42 #include "nsSegmentedBuffer.h"
43 #include "nsStreamUtils.h"
44 #include "nsAutoLock.h"
45 #include "nsCOMPtr.h"
46 #include "nsCRT.h"
47 #include "prlog.h"
48 #include "nsInt64.h"
49 #include "nsIClassInfoImpl.h"
51 #if defined(PR_LOGGING)
53 // set NSPR_LOG_MODULES=nsPipe:5
55 static PRLogModuleInfo *gPipeLog = PR_NewLogModule("nsPipe");
56 #define LOG(args) PR_LOG(gPipeLog, PR_LOG_DEBUG, args)
57 #else
58 #define LOG(args)
59 #endif
61 #define DEFAULT_SEGMENT_SIZE 4096
62 #define DEFAULT_SEGMENT_COUNT 16
64 class nsPipe;
65 class nsPipeEvents;
66 class nsPipeInputStream;
67 class nsPipeOutputStream;
69 //-----------------------------------------------------------------------------
71 // this class is used to delay notifications until the end of a particular
72 // scope. it helps avoid the complexity of issuing callbacks while inside
73 // a critical section.
74 class nsPipeEvents
76 public:
77 nsPipeEvents() { }
78 ~nsPipeEvents();
80 inline void NotifyInputReady(nsIAsyncInputStream *stream,
81 nsIInputStreamCallback *callback)
83 NS_ASSERTION(!mInputCallback, "already have an input event");
84 mInputStream = stream;
85 mInputCallback = callback;
88 inline void NotifyOutputReady(nsIAsyncOutputStream *stream,
89 nsIOutputStreamCallback *callback)
91 NS_ASSERTION(!mOutputCallback, "already have an output event");
92 mOutputStream = stream;
93 mOutputCallback = callback;
96 private:
97 nsCOMPtr<nsIAsyncInputStream> mInputStream;
98 nsCOMPtr<nsIInputStreamCallback> mInputCallback;
99 nsCOMPtr<nsIAsyncOutputStream> mOutputStream;
100 nsCOMPtr<nsIOutputStreamCallback> mOutputCallback;
103 //-----------------------------------------------------------------------------
105 // the input end of a pipe (allocated as a member of the pipe).
106 class nsPipeInputStream : public nsIAsyncInputStream
107 , public nsISeekableStream
108 , public nsISearchableInputStream
109 , public nsIClassInfo
111 public:
112 // since this class will be allocated as a member of the pipe, we do not
113 // need our own ref count. instead, we share the lifetime (the ref count)
114 // of the entire pipe. this macro is just convenience since it does not
115 // declare a mRefCount variable; however, don't let the name fool you...
116 // we are not inheriting from nsPipe ;-)
117 NS_DECL_ISUPPORTS_INHERITED
119 NS_DECL_NSIINPUTSTREAM
120 NS_DECL_NSIASYNCINPUTSTREAM
121 NS_DECL_NSISEEKABLESTREAM
122 NS_DECL_NSISEARCHABLEINPUTSTREAM
123 NS_DECL_NSICLASSINFO
125 nsPipeInputStream(nsPipe *pipe)
126 : mPipe(pipe)
127 , mReaderRefCnt(0)
128 , mLogicalOffset(0)
129 , mBlocking(PR_TRUE)
130 , mBlocked(PR_FALSE)
131 , mAvailable(0)
132 , mCallbackFlags(0)
135 nsresult Fill();
136 void SetNonBlocking(PRBool aNonBlocking) { mBlocking = !aNonBlocking; }
138 PRUint32 Available() { return mAvailable; }
139 void ReduceAvailable(PRUint32 avail) { mAvailable -= avail; }
141 // synchronously wait for the pipe to become readable.
142 nsresult Wait();
144 // these functions return true to indicate that the pipe's monitor should
145 // be notified, to wake up a blocked reader if any.
146 PRBool OnInputReadable(PRUint32 bytesWritten, nsPipeEvents &);
147 PRBool OnInputException(nsresult, nsPipeEvents &);
149 private:
150 nsPipe *mPipe;
152 // separate refcnt so that we know when to close the consumer
153 nsrefcnt mReaderRefCnt;
154 nsInt64 mLogicalOffset;
155 PRPackedBool mBlocking;
157 // these variables can only be accessed while inside the pipe's monitor
158 PRPackedBool mBlocked;
159 PRUint32 mAvailable;
160 nsCOMPtr<nsIInputStreamCallback> mCallback;
161 PRUint32 mCallbackFlags;
164 //-----------------------------------------------------------------------------
166 // the output end of a pipe (allocated as a member of the pipe).
167 class nsPipeOutputStream : public nsIAsyncOutputStream
168 , public nsIClassInfo
170 public:
171 // since this class will be allocated as a member of the pipe, we do not
172 // need our own ref count. instead, we share the lifetime (the ref count)
173 // of the entire pipe. this macro is just convenience since it does not
174 // declare a mRefCount variable; however, don't let the name fool you...
175 // we are not inheriting from nsPipe ;-)
176 NS_DECL_ISUPPORTS_INHERITED
178 NS_DECL_NSIOUTPUTSTREAM
179 NS_DECL_NSIASYNCOUTPUTSTREAM
180 NS_DECL_NSICLASSINFO
182 nsPipeOutputStream(nsPipe *pipe)
183 : mPipe(pipe)
184 , mWriterRefCnt(0)
185 , mLogicalOffset(0)
186 , mBlocking(PR_TRUE)
187 , mBlocked(PR_FALSE)
188 , mWritable(PR_TRUE)
189 , mCallbackFlags(0)
192 void SetNonBlocking(PRBool aNonBlocking) { mBlocking = !aNonBlocking; }
193 void SetWritable(PRBool writable) { mWritable = writable; }
195 // synchronously wait for the pipe to become writable.
196 nsresult Wait();
198 // these functions return true to indicate that the pipe's monitor should
199 // be notified, to wake up a blocked writer if any.
200 PRBool OnOutputWritable(nsPipeEvents &);
201 PRBool OnOutputException(nsresult, nsPipeEvents &);
203 private:
204 nsPipe *mPipe;
206 // separate refcnt so that we know when to close the producer
207 nsrefcnt mWriterRefCnt;
208 nsInt64 mLogicalOffset;
209 PRPackedBool mBlocking;
211 // these variables can only be accessed while inside the pipe's monitor
212 PRPackedBool mBlocked;
213 PRPackedBool mWritable;
214 nsCOMPtr<nsIOutputStreamCallback> mCallback;
215 PRUint32 mCallbackFlags;
218 //-----------------------------------------------------------------------------
220 class nsPipe : public nsIPipe
222 public:
223 friend class nsPipeInputStream;
224 friend class nsPipeOutputStream;
226 NS_DECL_ISUPPORTS
227 NS_DECL_NSIPIPE
229 // nsPipe methods:
230 nsPipe();
232 private:
233 ~nsPipe();
235 public:
237 // methods below may only be called while inside the pipe's monitor
240 void PeekSegment(PRUint32 n, char *&cursor, char *&limit);
243 // methods below may be called while outside the pipe's monitor
246 nsresult GetReadSegment(const char *&segment, PRUint32 &segmentLen);
247 void AdvanceReadCursor(PRUint32 count);
249 nsresult GetWriteSegment(char *&segment, PRUint32 &segmentLen);
250 void AdvanceWriteCursor(PRUint32 count);
252 void OnPipeException(nsresult reason, PRBool outputOnly = PR_FALSE);
254 protected:
255 // We can't inherit from both nsIInputStream and nsIOutputStream
256 // because they collide on their Close method. Consequently we nest their
257 // implementations to avoid the extra object allocation.
258 nsPipeInputStream mInput;
259 nsPipeOutputStream mOutput;
261 PRMonitor* mMonitor;
262 nsSegmentedBuffer mBuffer;
264 char* mReadCursor;
265 char* mReadLimit;
267 PRInt32 mWriteSegment;
268 char* mWriteCursor;
269 char* mWriteLimit;
271 nsresult mStatus;
275 // NOTES on buffer architecture:
277 // +-----------------+ - - mBuffer.GetSegment(0)
278 // | |
279 // + - - - - - - - - + - - mReadCursor
280 // |/////////////////|
281 // |/////////////////|
282 // |/////////////////|
283 // |/////////////////|
284 // +-----------------+ - - mReadLimit
285 // |
286 // +-----------------+
287 // |/////////////////|
288 // |/////////////////|
289 // |/////////////////|
290 // |/////////////////|
291 // |/////////////////|
292 // |/////////////////|
293 // +-----------------+
294 // |
295 // +-----------------+ - - mBuffer.GetSegment(mWriteSegment)
296 // |/////////////////|
297 // |/////////////////|
298 // |/////////////////|
299 // + - - - - - - - - + - - mWriteCursor
300 // | |
301 // | |
302 // +-----------------+ - - mWriteLimit
304 // (shaded region contains data)
306 // NOTE: on some systems (notably OS/2), the heap allocator uses an arena for
307 // small allocations (e.g., 64 byte allocations). this means that buffers may
308 // be allocated back-to-back. in the diagram above, for example, mReadLimit
309 // would actually be pointing at the beginning of the next segment. when
310 // making changes to this file, please keep this fact in mind.
313 //-----------------------------------------------------------------------------
314 // nsPipe methods:
315 //-----------------------------------------------------------------------------
317 nsPipe::nsPipe()
318 : mInput(this)
319 , mOutput(this)
320 , mMonitor(nsnull)
321 , mReadCursor(nsnull)
322 , mReadLimit(nsnull)
323 , mWriteSegment(-1)
324 , mWriteCursor(nsnull)
325 , mWriteLimit(nsnull)
326 , mStatus(NS_OK)
330 nsPipe::~nsPipe()
332 if (mMonitor)
333 nsAutoMonitor::DestroyMonitor(mMonitor);
336 NS_IMPL_THREADSAFE_ISUPPORTS1(nsPipe, nsIPipe)
338 NS_IMETHODIMP
339 nsPipe::Init(PRBool nonBlockingIn,
340 PRBool nonBlockingOut,
341 PRUint32 segmentSize,
342 PRUint32 segmentCount,
343 nsIMemory *segmentAlloc)
345 mMonitor = nsAutoMonitor::NewMonitor("pipeMonitor");
346 if (!mMonitor)
347 return NS_ERROR_OUT_OF_MEMORY;
349 if (segmentSize == 0)
350 segmentSize = DEFAULT_SEGMENT_SIZE;
351 if (segmentCount == 0)
352 segmentCount = DEFAULT_SEGMENT_COUNT;
354 // protect against overflow
355 PRUint32 maxCount = PRUint32(-1) / segmentSize;
356 if (segmentCount > maxCount)
357 segmentCount = maxCount;
359 nsresult rv = mBuffer.Init(segmentSize, segmentSize * segmentCount, segmentAlloc);
360 if (NS_FAILED(rv))
361 return rv;
363 mInput.SetNonBlocking(nonBlockingIn);
364 mOutput.SetNonBlocking(nonBlockingOut);
365 return NS_OK;
368 NS_IMETHODIMP
369 nsPipe::GetInputStream(nsIAsyncInputStream **aInputStream)
371 NS_ENSURE_TRUE(mMonitor, NS_ERROR_NOT_INITIALIZED);
372 NS_ADDREF(*aInputStream = &mInput);
373 return NS_OK;
376 NS_IMETHODIMP
377 nsPipe::GetOutputStream(nsIAsyncOutputStream **aOutputStream)
379 NS_ENSURE_TRUE(mMonitor, NS_ERROR_NOT_INITIALIZED);
380 NS_ADDREF(*aOutputStream = &mOutput);
381 return NS_OK;
384 void
385 nsPipe::PeekSegment(PRUint32 index, char *&cursor, char *&limit)
387 if (index == 0) {
388 NS_ASSERTION(!mReadCursor || mBuffer.GetSegmentCount(), "unexpected state");
389 cursor = mReadCursor;
390 limit = mReadLimit;
392 else {
393 PRUint32 numSegments = mBuffer.GetSegmentCount();
394 if (index >= numSegments)
395 cursor = limit = nsnull;
396 else {
397 cursor = mBuffer.GetSegment(index);
398 if (mWriteSegment == (PRInt32) index)
399 limit = mWriteCursor;
400 else
401 limit = cursor + mBuffer.GetSegmentSize();
406 nsresult
407 nsPipe::GetReadSegment(const char *&segment, PRUint32 &segmentLen)
409 nsAutoMonitor mon(mMonitor);
411 if (mReadCursor == mReadLimit)
412 return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK;
414 segment = mReadCursor;
415 segmentLen = mReadLimit - mReadCursor;
416 return NS_OK;
419 void
420 nsPipe::AdvanceReadCursor(PRUint32 bytesRead)
422 NS_ASSERTION(bytesRead, "don't call if no bytes read");
424 nsPipeEvents events;
426 nsAutoMonitor mon(mMonitor);
428 LOG(("III advancing read cursor by %u\n", bytesRead));
429 NS_ASSERTION(bytesRead <= mBuffer.GetSegmentSize(), "read too much");
431 mReadCursor += bytesRead;
432 NS_ASSERTION(mReadCursor <= mReadLimit, "read cursor exceeds limit");
434 mInput.ReduceAvailable(bytesRead);
436 if (mReadCursor == mReadLimit) {
437 // we've reached the limit of how much we can read from this segment.
438 // if at the end of this segment, then we must discard this segment.
440 // if still writing in this segment then bail because we're not done
441 // with the segment and have to wait for now...
442 if (mWriteSegment == 0 && mWriteLimit > mWriteCursor) {
443 NS_ASSERTION(mReadLimit == mWriteCursor, "unexpected state");
444 return;
447 // shift write segment index (-1 indicates an empty buffer).
448 --mWriteSegment;
450 // done with this segment
451 mBuffer.DeleteFirstSegment();
452 LOG(("III deleting first segment\n"));
454 if (mWriteSegment == -1) {
455 // buffer is completely empty
456 mReadCursor = nsnull;
457 mReadLimit = nsnull;
458 mWriteCursor = nsnull;
459 mWriteLimit = nsnull;
461 else {
462 // advance read cursor and limit to next buffer segment
463 mReadCursor = mBuffer.GetSegment(0);
464 if (mWriteSegment == 0)
465 mReadLimit = mWriteCursor;
466 else
467 mReadLimit = mReadCursor + mBuffer.GetSegmentSize();
470 // we've free'd up a segment, so notify output stream that pipe has
471 // room for a new segment.
472 if (mOutput.OnOutputWritable(events))
473 mon.Notify();
478 nsresult
479 nsPipe::GetWriteSegment(char *&segment, PRUint32 &segmentLen)
481 nsAutoMonitor mon(mMonitor);
483 if (NS_FAILED(mStatus))
484 return mStatus;
486 // write cursor and limit may both be null indicating an empty buffer.
487 if (mWriteCursor == mWriteLimit) {
488 char *seg = mBuffer.AppendNewSegment();
489 // pipe is full
490 if (seg == nsnull)
491 return NS_BASE_STREAM_WOULD_BLOCK;
492 LOG(("OOO appended new segment\n"));
493 mWriteCursor = seg;
494 mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize();
495 ++mWriteSegment;
498 // make sure read cursor is initialized
499 if (mReadCursor == nsnull) {
500 NS_ASSERTION(mWriteSegment == 0, "unexpected null read cursor");
501 mReadCursor = mReadLimit = mWriteCursor;
504 // check to see if we can roll-back our read and write cursors to the
505 // beginning of the current/first segment. this is purely an optimization.
506 if (mReadCursor == mWriteCursor && mWriteSegment == 0) {
507 char *head = mBuffer.GetSegment(0);
508 LOG(("OOO rolling back write cursor %u bytes\n", mWriteCursor - head));
509 mWriteCursor = mReadCursor = mReadLimit = head;
512 segment = mWriteCursor;
513 segmentLen = mWriteLimit - mWriteCursor;
514 return NS_OK;
517 void
518 nsPipe::AdvanceWriteCursor(PRUint32 bytesWritten)
520 NS_ASSERTION(bytesWritten, "don't call if no bytes written");
522 nsPipeEvents events;
524 nsAutoMonitor mon(mMonitor);
526 LOG(("OOO advancing write cursor by %u\n", bytesWritten));
528 char *newWriteCursor = mWriteCursor + bytesWritten;
529 NS_ASSERTION(newWriteCursor <= mWriteLimit, "write cursor exceeds limit");
531 // update read limit if reading in the same segment
532 if (mWriteSegment == 0 && mReadLimit == mWriteCursor)
533 mReadLimit = newWriteCursor;
535 mWriteCursor = newWriteCursor;
537 // The only way mReadCursor == mWriteCursor is if:
539 // - mReadCursor is at the start of a segment (which, based on how
540 // nsSegmentedBuffer works, means that this segment is the "first"
541 // segment)
542 // - mWriteCursor points at the location past the end of the current
543 // write segment (so the current write filled the current write
544 // segment, so we've incremented mWriteCursor to point past the end
545 // of it)
546 // - the segment to which data has just been written is located
547 // exactly one segment's worth of bytes before the first segment
548 // where mReadCursor is located
550 // Consequently, the byte immediately after the end of the current
551 // write segment is the first byte of the first segment, so
552 // mReadCursor == mWriteCursor. (Another way to think about this is
553 // to consider the buffer architecture diagram above, but consider it
554 // with an arena allocator which allocates from the *end* of the
555 // arena to the *beginning* of the arena.)
556 NS_ASSERTION(mReadCursor != mWriteCursor ||
557 (mBuffer.GetSegment(0) == mReadCursor &&
558 mWriteCursor == mWriteLimit),
559 "read cursor is bad");
561 // update the writable flag on the output stream
562 if (mWriteCursor == mWriteLimit) {
563 if (mBuffer.GetSize() >= mBuffer.GetMaxSize())
564 mOutput.SetWritable(PR_FALSE);
567 // notify input stream that pipe now contains additional data
568 if (mInput.OnInputReadable(bytesWritten, events))
569 mon.Notify();
573 void
574 nsPipe::OnPipeException(nsresult reason, PRBool outputOnly)
576 LOG(("PPP nsPipe::OnPipeException [reason=%x output-only=%d]\n",
577 reason, outputOnly));
579 nsPipeEvents events;
581 nsAutoMonitor mon(mMonitor);
583 // if we've already hit an exception, then ignore this one.
584 if (NS_FAILED(mStatus))
585 return;
587 mStatus = reason;
589 // an output-only exception applies to the input end if the pipe has
590 // zero bytes available.
591 if (outputOnly && !mInput.Available())
592 outputOnly = PR_FALSE;
594 if (!outputOnly)
595 if (mInput.OnInputException(reason, events))
596 mon.Notify();
598 if (mOutput.OnOutputException(reason, events))
599 mon.Notify();
603 //-----------------------------------------------------------------------------
604 // nsPipeEvents methods:
605 //-----------------------------------------------------------------------------
607 nsPipeEvents::~nsPipeEvents()
609 // dispatch any pending events
611 if (mInputCallback) {
612 mInputCallback->OnInputStreamReady(mInputStream);
613 mInputCallback = 0;
614 mInputStream = 0;
616 if (mOutputCallback) {
617 mOutputCallback->OnOutputStreamReady(mOutputStream);
618 mOutputCallback = 0;
619 mOutputStream = 0;
623 //-----------------------------------------------------------------------------
624 // nsPipeInputStream methods:
625 //-----------------------------------------------------------------------------
627 NS_IMPL_QUERY_INTERFACE5(nsPipeInputStream,
628 nsIInputStream,
629 nsIAsyncInputStream,
630 nsISeekableStream,
631 nsISearchableInputStream,
632 nsIClassInfo)
634 NS_IMPL_CI_INTERFACE_GETTER4(nsPipeInputStream,
635 nsIInputStream,
636 nsIAsyncInputStream,
637 nsISeekableStream,
638 nsISearchableInputStream)
640 NS_IMPL_THREADSAFE_CI(nsPipeInputStream)
642 nsresult
643 nsPipeInputStream::Wait()
645 NS_ASSERTION(mBlocking, "wait on non-blocking pipe input stream");
647 nsAutoMonitor mon(mPipe->mMonitor);
649 while (NS_SUCCEEDED(mPipe->mStatus) && (mAvailable == 0)) {
650 LOG(("III pipe input: waiting for data\n"));
652 mBlocked = PR_TRUE;
653 mon.Wait();
654 mBlocked = PR_FALSE;
656 LOG(("III pipe input: woke up [pipe-status=%x available=%u]\n",
657 mPipe->mStatus, mAvailable));
660 return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
663 PRBool
664 nsPipeInputStream::OnInputReadable(PRUint32 bytesWritten, nsPipeEvents &events)
666 PRBool result = PR_FALSE;
668 mAvailable += bytesWritten;
670 if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
671 events.NotifyInputReady(this, mCallback);
672 mCallback = 0;
673 mCallbackFlags = 0;
675 else if (mBlocked)
676 result = PR_TRUE;
678 return result;
681 PRBool
682 nsPipeInputStream::OnInputException(nsresult reason, nsPipeEvents &events)
684 LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n",
685 this, reason));
687 PRBool result = PR_FALSE;
689 NS_ASSERTION(NS_FAILED(reason), "huh? successful exception");
691 // force count of available bytes to zero.
692 mAvailable = 0;
694 if (mCallback) {
695 events.NotifyInputReady(this, mCallback);
696 mCallback = 0;
697 mCallbackFlags = 0;
699 else if (mBlocked)
700 result = PR_TRUE;
702 return result;
705 NS_IMETHODIMP_(nsrefcnt)
706 nsPipeInputStream::AddRef(void)
708 PR_AtomicIncrement((PRInt32*)&mReaderRefCnt);
709 return mPipe->AddRef();
712 NS_IMETHODIMP_(nsrefcnt)
713 nsPipeInputStream::Release(void)
715 if (PR_AtomicDecrement((PRInt32 *)&mReaderRefCnt) == 0)
716 Close();
717 return mPipe->Release();
720 NS_IMETHODIMP
721 nsPipeInputStream::CloseWithStatus(nsresult reason)
723 LOG(("III CloseWithStatus [this=%x reason=%x]\n", this, reason));
725 if (NS_SUCCEEDED(reason))
726 reason = NS_BASE_STREAM_CLOSED;
728 mPipe->OnPipeException(reason);
729 return NS_OK;
732 NS_IMETHODIMP
733 nsPipeInputStream::Close()
735 return CloseWithStatus(NS_BASE_STREAM_CLOSED);
738 NS_IMETHODIMP
739 nsPipeInputStream::Available(PRUint32 *result)
741 nsAutoMonitor mon(mPipe->mMonitor);
743 // return error if pipe closed
744 if (!mAvailable && NS_FAILED(mPipe->mStatus))
745 return mPipe->mStatus;
747 *result = mAvailable;
748 return NS_OK;
751 NS_IMETHODIMP
752 nsPipeInputStream::ReadSegments(nsWriteSegmentFun writer,
753 void *closure,
754 PRUint32 count,
755 PRUint32 *readCount)
757 LOG(("III ReadSegments [this=%x count=%u]\n", this, count));
759 nsresult rv = NS_OK;
761 const char *segment;
762 PRUint32 segmentLen;
764 *readCount = 0;
765 while (count) {
766 rv = mPipe->GetReadSegment(segment, segmentLen);
767 if (NS_FAILED(rv)) {
768 // ignore this error if we've already read something.
769 if (*readCount > 0) {
770 rv = NS_OK;
771 break;
773 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
774 // pipe is empty
775 if (!mBlocking)
776 break;
777 // wait for some data to be written to the pipe
778 rv = Wait();
779 if (NS_SUCCEEDED(rv))
780 continue;
782 // ignore this error, just return.
783 if (rv == NS_BASE_STREAM_CLOSED) {
784 rv = NS_OK;
785 break;
787 mPipe->OnPipeException(rv);
788 break;
791 // read no more than count
792 if (segmentLen > count)
793 segmentLen = count;
795 PRUint32 writeCount, originalLen = segmentLen;
796 while (segmentLen) {
797 writeCount = 0;
799 rv = writer(this, closure, segment, *readCount, segmentLen, &writeCount);
801 if (NS_FAILED(rv) || writeCount == 0) {
802 count = 0;
803 // any errors returned from the writer end here: do not
804 // propagate to the caller of ReadSegments.
805 rv = NS_OK;
806 break;
809 NS_ASSERTION(writeCount <= segmentLen, "wrote more than expected");
810 segment += writeCount;
811 segmentLen -= writeCount;
812 count -= writeCount;
813 *readCount += writeCount;
814 mLogicalOffset += writeCount;
817 if (segmentLen < originalLen)
818 mPipe->AdvanceReadCursor(originalLen - segmentLen);
821 return rv;
824 NS_IMETHODIMP
825 nsPipeInputStream::Read(char* toBuf, PRUint32 bufLen, PRUint32 *readCount)
827 return ReadSegments(NS_CopySegmentToBuffer, toBuf, bufLen, readCount);
830 NS_IMETHODIMP
831 nsPipeInputStream::IsNonBlocking(PRBool *aNonBlocking)
833 *aNonBlocking = !mBlocking;
834 return NS_OK;
837 NS_IMETHODIMP
838 nsPipeInputStream::AsyncWait(nsIInputStreamCallback *callback,
839 PRUint32 flags,
840 PRUint32 requestedCount,
841 nsIEventTarget *target)
843 LOG(("III AsyncWait [this=%x]\n", this));
845 nsPipeEvents pipeEvents;
847 nsAutoMonitor mon(mPipe->mMonitor);
849 // replace a pending callback
850 mCallback = 0;
851 mCallbackFlags = 0;
853 if (!callback)
854 return NS_OK;
856 nsCOMPtr<nsIInputStreamCallback> proxy;
857 if (target) {
858 nsresult rv = NS_NewInputStreamReadyEvent(getter_AddRefs(proxy),
859 callback, target);
860 if (NS_FAILED(rv)) return rv;
861 callback = proxy;
864 if (NS_FAILED(mPipe->mStatus) ||
865 (mAvailable && !(flags & WAIT_CLOSURE_ONLY))) {
866 // stream is already closed or readable; post event.
867 pipeEvents.NotifyInputReady(this, callback);
869 else {
870 // queue up callback object to be notified when data becomes available
871 mCallback = callback;
872 mCallbackFlags = flags;
875 return NS_OK;
878 NS_IMETHODIMP
879 nsPipeInputStream::Seek(PRInt32 whence, PRInt64 offset)
881 NS_NOTREACHED("nsPipeInputStream::Seek");
882 return NS_ERROR_NOT_IMPLEMENTED;
885 NS_IMETHODIMP
886 nsPipeInputStream::Tell(PRInt64 *offset)
888 nsAutoMonitor mon(mPipe->mMonitor);
890 // return error if pipe closed
891 if (!mAvailable && NS_FAILED(mPipe->mStatus))
892 return mPipe->mStatus;
894 *offset = mLogicalOffset;
895 return NS_OK;
898 NS_IMETHODIMP
899 nsPipeInputStream::SetEOF()
901 NS_NOTREACHED("nsPipeInputStream::SetEOF");
902 return NS_ERROR_NOT_IMPLEMENTED;
905 #define COMPARE(s1, s2, i) \
906 (ignoreCase \
907 ? nsCRT::strncasecmp((const char *)s1, (const char *)s2, (PRUint32)i) \
908 : nsCRT::strncmp((const char *)s1, (const char *)s2, (PRUint32)i))
910 NS_IMETHODIMP
911 nsPipeInputStream::Search(const char *forString,
912 PRBool ignoreCase,
913 PRBool *found,
914 PRUint32 *offsetSearchedTo)
916 LOG(("III Search [for=%s ic=%u]\n", forString, ignoreCase));
918 nsAutoMonitor mon(mPipe->mMonitor);
920 char *cursor1, *limit1;
921 PRUint32 index = 0, offset = 0;
922 PRUint32 strLen = strlen(forString);
924 mPipe->PeekSegment(0, cursor1, limit1);
925 if (cursor1 == limit1) {
926 *found = PR_FALSE;
927 *offsetSearchedTo = 0;
928 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
929 return NS_OK;
932 while (PR_TRUE) {
933 PRUint32 i, len1 = limit1 - cursor1;
935 // check if the string is in the buffer segment
936 for (i = 0; i < len1 - strLen + 1; i++) {
937 if (COMPARE(&cursor1[i], forString, strLen) == 0) {
938 *found = PR_TRUE;
939 *offsetSearchedTo = offset + i;
940 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
941 return NS_OK;
945 // get the next segment
946 char *cursor2, *limit2;
947 PRUint32 len2;
949 index++;
950 offset += len1;
952 mPipe->PeekSegment(index, cursor2, limit2);
953 if (cursor2 == limit2) {
954 *found = PR_FALSE;
955 *offsetSearchedTo = offset - strLen + 1;
956 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
957 return NS_OK;
959 len2 = limit2 - cursor2;
961 // check if the string is straddling the next buffer segment
962 PRUint32 lim = PR_MIN(strLen, len2 + 1);
963 for (i = 0; i < lim; ++i) {
964 PRUint32 strPart1Len = strLen - i - 1;
965 PRUint32 strPart2Len = strLen - strPart1Len;
966 const char* strPart2 = &forString[strLen - strPart2Len];
967 PRUint32 bufSeg1Offset = len1 - strPart1Len;
968 if (COMPARE(&cursor1[bufSeg1Offset], forString, strPart1Len) == 0 &&
969 COMPARE(cursor2, strPart2, strPart2Len) == 0) {
970 *found = PR_TRUE;
971 *offsetSearchedTo = offset - strPart1Len;
972 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
973 return NS_OK;
977 // finally continue with the next buffer
978 cursor1 = cursor2;
979 limit1 = limit2;
982 NS_NOTREACHED("can't get here");
983 return NS_ERROR_UNEXPECTED; // keep compiler happy
986 //-----------------------------------------------------------------------------
987 // nsPipeOutputStream methods:
988 //-----------------------------------------------------------------------------
990 NS_IMPL_QUERY_INTERFACE3(nsPipeOutputStream,
991 nsIOutputStream,
992 nsIAsyncOutputStream,
993 nsIClassInfo)
995 NS_IMPL_CI_INTERFACE_GETTER2(nsPipeOutputStream,
996 nsIOutputStream,
997 nsIAsyncOutputStream)
999 NS_IMPL_THREADSAFE_CI(nsPipeOutputStream)
1001 nsresult
1002 nsPipeOutputStream::Wait()
1004 NS_ASSERTION(mBlocking, "wait on non-blocking pipe output stream");
1006 nsAutoMonitor mon(mPipe->mMonitor);
1008 if (NS_SUCCEEDED(mPipe->mStatus) && !mWritable) {
1009 LOG(("OOO pipe output: waiting for space\n"));
1010 mBlocked = PR_TRUE;
1011 mon.Wait();
1012 mBlocked = PR_FALSE;
1013 LOG(("OOO pipe output: woke up [pipe-status=%x writable=%u]\n",
1014 mPipe->mStatus, mWritable));
1017 return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
1020 PRBool
1021 nsPipeOutputStream::OnOutputWritable(nsPipeEvents &events)
1023 PRBool result = PR_FALSE;
1025 mWritable = PR_TRUE;
1027 if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
1028 events.NotifyOutputReady(this, mCallback);
1029 mCallback = 0;
1030 mCallbackFlags = 0;
1032 else if (mBlocked)
1033 result = PR_TRUE;
1035 return result;
1038 PRBool
1039 nsPipeOutputStream::OnOutputException(nsresult reason, nsPipeEvents &events)
1041 LOG(("nsPipeOutputStream::OnOutputException [this=%x reason=%x]\n",
1042 this, reason));
1044 PRBool result = PR_FALSE;
1046 NS_ASSERTION(NS_FAILED(reason), "huh? successful exception");
1047 mWritable = PR_FALSE;
1049 if (mCallback) {
1050 events.NotifyOutputReady(this, mCallback);
1051 mCallback = 0;
1052 mCallbackFlags = 0;
1054 else if (mBlocked)
1055 result = PR_TRUE;
1057 return result;
1061 NS_IMETHODIMP_(nsrefcnt)
1062 nsPipeOutputStream::AddRef()
1064 PR_AtomicIncrement((PRInt32*)&mWriterRefCnt);
1065 return mPipe->AddRef();
1068 NS_IMETHODIMP_(nsrefcnt)
1069 nsPipeOutputStream::Release()
1071 if (PR_AtomicDecrement((PRInt32 *)&mWriterRefCnt) == 0)
1072 Close();
1073 return mPipe->Release();
1076 NS_IMETHODIMP
1077 nsPipeOutputStream::CloseWithStatus(nsresult reason)
1079 LOG(("OOO CloseWithStatus [this=%x reason=%x]\n", this, reason));
1081 if (NS_SUCCEEDED(reason))
1082 reason = NS_BASE_STREAM_CLOSED;
1084 // input stream may remain open
1085 mPipe->OnPipeException(reason, PR_TRUE);
1086 return NS_OK;
1089 NS_IMETHODIMP
1090 nsPipeOutputStream::Close()
1092 return CloseWithStatus(NS_BASE_STREAM_CLOSED);
1095 NS_IMETHODIMP
1096 nsPipeOutputStream::WriteSegments(nsReadSegmentFun reader,
1097 void* closure,
1098 PRUint32 count,
1099 PRUint32 *writeCount)
1101 LOG(("OOO WriteSegments [this=%x count=%u]\n", this, count));
1103 nsresult rv = NS_OK;
1105 char *segment;
1106 PRUint32 segmentLen;
1108 *writeCount = 0;
1109 while (count) {
1110 rv = mPipe->GetWriteSegment(segment, segmentLen);
1111 if (NS_FAILED(rv)) {
1112 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
1113 // pipe is full
1114 if (!mBlocking) {
1115 // ignore this error if we've already written something
1116 if (*writeCount > 0)
1117 rv = NS_OK;
1118 break;
1120 // wait for the pipe to have an empty segment.
1121 rv = Wait();
1122 if (NS_SUCCEEDED(rv))
1123 continue;
1125 mPipe->OnPipeException(rv);
1126 break;
1129 // write no more than count
1130 if (segmentLen > count)
1131 segmentLen = count;
1133 PRUint32 readCount, originalLen = segmentLen;
1134 while (segmentLen) {
1135 readCount = 0;
1137 rv = reader(this, closure, segment, *writeCount, segmentLen, &readCount);
1139 if (NS_FAILED(rv) || readCount == 0) {
1140 count = 0;
1141 // any errors returned from the reader end here: do not
1142 // propagate to the caller of WriteSegments.
1143 rv = NS_OK;
1144 break;
1147 NS_ASSERTION(readCount <= segmentLen, "read more than expected");
1148 segment += readCount;
1149 segmentLen -= readCount;
1150 count -= readCount;
1151 *writeCount += readCount;
1152 mLogicalOffset += readCount;
1155 if (segmentLen < originalLen)
1156 mPipe->AdvanceWriteCursor(originalLen - segmentLen);
1159 return rv;
1162 static NS_METHOD
1163 nsReadFromRawBuffer(nsIOutputStream* outStr,
1164 void* closure,
1165 char* toRawSegment,
1166 PRUint32 offset,
1167 PRUint32 count,
1168 PRUint32 *readCount)
1170 const char* fromBuf = (const char*)closure;
1171 memcpy(toRawSegment, &fromBuf[offset], count);
1172 *readCount = count;
1173 return NS_OK;
1176 NS_IMETHODIMP
1177 nsPipeOutputStream::Write(const char* fromBuf,
1178 PRUint32 bufLen,
1179 PRUint32 *writeCount)
1181 return WriteSegments(nsReadFromRawBuffer, (void*)fromBuf, bufLen, writeCount);
1184 NS_IMETHODIMP
1185 nsPipeOutputStream::Flush(void)
1187 // nothing to do
1188 return NS_OK;
1191 static NS_METHOD
1192 nsReadFromInputStream(nsIOutputStream* outStr,
1193 void* closure,
1194 char* toRawSegment,
1195 PRUint32 offset,
1196 PRUint32 count,
1197 PRUint32 *readCount)
1199 nsIInputStream* fromStream = (nsIInputStream*)closure;
1200 return fromStream->Read(toRawSegment, count, readCount);
1203 NS_IMETHODIMP
1204 nsPipeOutputStream::WriteFrom(nsIInputStream* fromStream,
1205 PRUint32 count,
1206 PRUint32 *writeCount)
1208 return WriteSegments(nsReadFromInputStream, fromStream, count, writeCount);
1211 NS_IMETHODIMP
1212 nsPipeOutputStream::IsNonBlocking(PRBool *aNonBlocking)
1214 *aNonBlocking = !mBlocking;
1215 return NS_OK;
1218 NS_IMETHODIMP
1219 nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback *callback,
1220 PRUint32 flags,
1221 PRUint32 requestedCount,
1222 nsIEventTarget *target)
1224 LOG(("OOO AsyncWait [this=%x]\n", this));
1226 nsPipeEvents pipeEvents;
1228 nsAutoMonitor mon(mPipe->mMonitor);
1230 // replace a pending callback
1231 mCallback = 0;
1232 mCallbackFlags = 0;
1234 if (!callback)
1235 return NS_OK;
1237 nsCOMPtr<nsIOutputStreamCallback> proxy;
1238 if (target) {
1239 nsresult rv = NS_NewOutputStreamReadyEvent(getter_AddRefs(proxy),
1240 callback, target);
1241 if (NS_FAILED(rv)) return rv;
1242 callback = proxy;
1245 if (NS_FAILED(mPipe->mStatus) ||
1246 (mWritable && !(flags & WAIT_CLOSURE_ONLY))) {
1247 // stream is already closed or writable; post event.
1248 pipeEvents.NotifyOutputReady(this, callback);
1250 else {
1251 // queue up callback object to be notified when data becomes available
1252 mCallback = callback;
1253 mCallbackFlags = flags;
1256 return NS_OK;
1259 ////////////////////////////////////////////////////////////////////////////////
1261 NS_COM nsresult
1262 NS_NewPipe(nsIInputStream **pipeIn,
1263 nsIOutputStream **pipeOut,
1264 PRUint32 segmentSize,
1265 PRUint32 maxSize,
1266 PRBool nonBlockingInput,
1267 PRBool nonBlockingOutput,
1268 nsIMemory *segmentAlloc)
1270 if (segmentSize == 0)
1271 segmentSize = DEFAULT_SEGMENT_SIZE;
1273 // Handle maxSize of PR_UINT32_MAX as a special case
1274 PRUint32 segmentCount;
1275 if (maxSize == PR_UINT32_MAX)
1276 segmentCount = PR_UINT32_MAX;
1277 else
1278 segmentCount = maxSize / segmentSize;
1280 nsIAsyncInputStream *in;
1281 nsIAsyncOutputStream *out;
1282 nsresult rv = NS_NewPipe2(&in, &out, nonBlockingInput, nonBlockingOutput,
1283 segmentSize, segmentCount, segmentAlloc);
1284 if (NS_FAILED(rv)) return rv;
1286 *pipeIn = in;
1287 *pipeOut = out;
1288 return NS_OK;
1291 NS_COM nsresult
1292 NS_NewPipe2(nsIAsyncInputStream **pipeIn,
1293 nsIAsyncOutputStream **pipeOut,
1294 PRBool nonBlockingInput,
1295 PRBool nonBlockingOutput,
1296 PRUint32 segmentSize,
1297 PRUint32 segmentCount,
1298 nsIMemory *segmentAlloc)
1300 nsresult rv;
1302 nsPipe *pipe = new nsPipe();
1303 if (!pipe)
1304 return NS_ERROR_OUT_OF_MEMORY;
1306 rv = pipe->Init(nonBlockingInput,
1307 nonBlockingOutput,
1308 segmentSize,
1309 segmentCount,
1310 segmentAlloc);
1311 if (NS_FAILED(rv)) {
1312 NS_ADDREF(pipe);
1313 NS_RELEASE(pipe);
1314 return rv;
1317 pipe->GetInputStream(pipeIn);
1318 pipe->GetOutputStream(pipeOut);
1319 return NS_OK;
1322 nsresult
1323 nsPipeConstructor(nsISupports *outer, REFNSIID iid, void **result)
1325 if (outer)
1326 return NS_ERROR_NO_AGGREGATION;
1327 nsPipe *pipe = new nsPipe();
1328 if (!pipe)
1329 return NS_ERROR_OUT_OF_MEMORY;
1330 NS_ADDREF(pipe);
1331 nsresult rv = pipe->QueryInterface(iid, result);
1332 NS_RELEASE(pipe);
1333 return rv;
1336 ////////////////////////////////////////////////////////////////////////////////