1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
8 #include "gtest/gtest.h"
10 #include "mozilla/gtest/MozAssertions.h"
11 #include "mozilla/ReentrantMonitor.h"
12 #include "mozilla/Printf.h"
15 #include "nsIAsyncInputStream.h"
16 #include "nsIAsyncOutputStream.h"
17 #include "nsIBufferedStreams.h"
18 #include "nsIClassInfo.h"
19 #include "nsICloneableInputStream.h"
20 #include "nsIInputStream.h"
21 #include "nsIOutputStream.h"
23 #include "nsITellableStream.h"
24 #include "nsIThread.h"
25 #include "nsIRunnable.h"
26 #include "nsStreamUtils.h"
28 #include "nsThreadUtils.h"
31 using namespace mozilla
;
33 #define ITERATIONS 33333
34 char kTestPattern
[] = "My hovercraft is full of eels.\n";
38 static nsresult
WriteAll(nsIOutputStream
* os
, const char* buf
, uint32_t bufLen
,
39 uint32_t* lenWritten
) {
44 nsresult rv
= os
->Write(p
, bufLen
, &n
);
45 if (NS_FAILED(rv
)) return rv
;
53 class nsReceiver final
: public Runnable
{
55 NS_IMETHOD
Run() override
{
59 PRIntervalTime start
= PR_IntervalNow();
61 rv
= mIn
->Read(buf
, 100, &count
);
63 printf("read failed\n");
67 // printf("EOF count = %d\n", mCount);
73 printf("read: %s\n", buf
);
77 PRIntervalTime end
= PR_IntervalNow();
78 printf("read %d bytes, time = %dms\n", mCount
,
79 PR_IntervalToMilliseconds(end
- start
));
83 explicit nsReceiver(nsIInputStream
* in
)
84 : Runnable("nsReceiver"), mIn(in
), mCount(0) {}
86 uint32_t GetBytesRead() { return mCount
; }
89 ~nsReceiver() = default;
92 nsCOMPtr
<nsIInputStream
> mIn
;
96 static nsresult
TestPipe(nsIInputStream
* in
, nsIOutputStream
* out
) {
97 RefPtr
<nsReceiver
> receiver
= new nsReceiver(in
);
100 nsCOMPtr
<nsIThread
> thread
;
101 rv
= NS_NewNamedThread("TestPipe", getter_AddRefs(thread
), receiver
);
102 if (NS_FAILED(rv
)) return rv
;
105 PRIntervalTime start
= PR_IntervalNow();
106 for (uint32_t i
= 0; i
< ITERATIONS
; i
++) {
108 SmprintfPointer buf
= mozilla::Smprintf("%d %s", i
, kTestPattern
);
109 uint32_t len
= strlen(buf
.get());
110 rv
= WriteAll(out
, buf
.get(), len
, &writeCount
);
113 for (uint32_t j
= 0; j
< writeCount
; j
++) {
114 putc(buf
.get()[j
], stdout
);
118 if (NS_FAILED(rv
)) return rv
;
122 if (NS_FAILED(rv
)) return rv
;
124 PRIntervalTime end
= PR_IntervalNow();
128 printf("wrote %d bytes, time = %dms\n", total
,
129 PR_IntervalToMilliseconds(end
- start
));
130 EXPECT_EQ(receiver
->GetBytesRead(), total
);
135 ////////////////////////////////////////////////////////////////////////////////
137 class nsShortReader final
: public Runnable
{
139 NS_IMETHOD
Run() override
{
146 // printf("calling Read\n");
147 rv
= mIn
->Read(buf
, 100, &count
);
149 printf("read failed\n");
157 // For next |printf()| call and possible others elsewhere.
160 printf("read %d bytes: %s\n", count
, buf
);
166 printf("read %d bytes\n", total
);
170 explicit nsShortReader(nsIInputStream
* in
)
171 : Runnable("nsShortReader"), mIn(in
), mReceived(0) {
172 mMon
= new ReentrantMonitor("nsShortReader");
175 void Received(uint32_t count
) {
176 ReentrantMonitorAutoEnter
mon(*mMon
);
181 uint32_t WaitForReceipt(const uint32_t aWriteCount
) {
182 ReentrantMonitorAutoEnter
mon(*mMon
);
183 uint32_t result
= mReceived
;
185 while (result
< aWriteCount
) {
188 EXPECT_TRUE(mReceived
> result
);
197 ~nsShortReader() = default;
200 nsCOMPtr
<nsIInputStream
> mIn
;
202 ReentrantMonitor
* mMon
;
205 static nsresult
TestShortWrites(nsIInputStream
* in
, nsIOutputStream
* out
) {
206 RefPtr
<nsShortReader
> receiver
= new nsShortReader(in
);
209 nsCOMPtr
<nsIThread
> thread
;
210 rv
= NS_NewNamedThread("TestShortWrites", getter_AddRefs(thread
), receiver
);
211 if (NS_FAILED(rv
)) return rv
;
214 for (uint32_t i
= 0; i
< ITERATIONS
; i
++) {
216 SmprintfPointer buf
= mozilla::Smprintf("%d %s", i
, kTestPattern
);
217 uint32_t len
= strlen(buf
.get());
218 len
= len
* rand() / RAND_MAX
;
219 len
= std::min(1u, len
);
220 rv
= WriteAll(out
, buf
.get(), len
, &writeCount
);
221 if (NS_FAILED(rv
)) return rv
;
222 EXPECT_EQ(writeCount
, len
);
225 if (gTrace
) printf("wrote %d bytes: %s\n", writeCount
, buf
.get());
226 // printf("calling Flush\n");
228 // printf("calling WaitForReceipt\n");
231 const uint32_t received
= receiver
->WaitForReceipt(writeCount
);
232 EXPECT_EQ(received
, writeCount
);
236 if (NS_FAILED(rv
)) return rv
;
240 printf("wrote %d bytes\n", total
);
245 ////////////////////////////////////////////////////////////////////////////////
247 class nsPump final
: public Runnable
{
249 NS_IMETHOD
Run() override
{
253 rv
= mOut
->WriteFrom(mIn
, ~0U, &count
);
255 printf("Write failed\n");
259 printf("EOF count = %d\n", mCount
);
264 printf("Wrote: %d\n", count
);
272 nsPump(nsIInputStream
* in
, nsIOutputStream
* out
)
273 : Runnable("nsPump"), mIn(in
), mOut(out
), mCount(0) {}
279 nsCOMPtr
<nsIInputStream
> mIn
;
280 nsCOMPtr
<nsIOutputStream
> mOut
;
284 TEST(Pipes
, ChainedPipes
)
288 printf("TestChainedPipes\n");
291 nsCOMPtr
<nsIInputStream
> in1
;
292 nsCOMPtr
<nsIOutputStream
> out1
;
293 NS_NewPipe(getter_AddRefs(in1
), getter_AddRefs(out1
), 20, 1999);
295 nsCOMPtr
<nsIInputStream
> in2
;
296 nsCOMPtr
<nsIOutputStream
> out2
;
297 NS_NewPipe(getter_AddRefs(in2
), getter_AddRefs(out2
), 200, 401);
299 RefPtr
<nsPump
> pump
= new nsPump(in1
, out2
);
300 if (pump
== nullptr) return;
302 nsCOMPtr
<nsIThread
> thread
;
303 rv
= NS_NewNamedThread("ChainedPipePump", getter_AddRefs(thread
), pump
);
304 if (NS_FAILED(rv
)) return;
306 RefPtr
<nsReceiver
> receiver
= new nsReceiver(in2
);
307 if (receiver
== nullptr) return;
309 nsCOMPtr
<nsIThread
> receiverThread
;
310 rv
= NS_NewNamedThread("ChainedPipeRecv", getter_AddRefs(receiverThread
),
312 if (NS_FAILED(rv
)) return;
315 for (uint32_t i
= 0; i
< ITERATIONS
; i
++) {
317 SmprintfPointer buf
= mozilla::Smprintf("%d %s", i
, kTestPattern
);
318 uint32_t len
= strlen(buf
.get());
319 len
= len
* rand() / RAND_MAX
;
320 len
= std::max(1u, len
);
321 rv
= WriteAll(out1
, buf
.get(), len
, &writeCount
);
322 if (NS_FAILED(rv
)) return;
323 EXPECT_EQ(writeCount
, len
);
326 if (gTrace
) printf("wrote %d bytes: %s\n", writeCount
, buf
.get());
329 printf("wrote total of %d bytes\n", total
);
332 if (NS_FAILED(rv
)) return;
335 receiverThread
->Shutdown();
338 ////////////////////////////////////////////////////////////////////////////////
340 static void RunTests(uint32_t segSize
, uint32_t segCount
) {
342 nsCOMPtr
<nsIInputStream
> in
;
343 nsCOMPtr
<nsIOutputStream
> out
;
344 uint32_t bufSize
= segSize
* segCount
;
346 printf("Testing New Pipes: segment size %d buffer size %d\n", segSize
,
348 printf("Testing long writes...\n");
350 NS_NewPipe(getter_AddRefs(in
), getter_AddRefs(out
), segSize
, bufSize
);
351 rv
= TestPipe(in
, out
);
352 EXPECT_NS_SUCCEEDED(rv
);
355 printf("Testing short writes...\n");
357 NS_NewPipe(getter_AddRefs(in
), getter_AddRefs(out
), segSize
, bufSize
);
358 rv
= TestShortWrites(in
, out
);
359 EXPECT_NS_SUCCEEDED(rv
);
368 ////////////////////////////////////////////////////////////////////////////////
372 static const uint32_t DEFAULT_SEGMENT_SIZE
= 4 * 1024;
374 // An alternate pipe testing routing that uses NS_ConsumeStream() instead of
376 static void TestPipe2(uint32_t aNumBytes
,
377 uint32_t aSegmentSize
= DEFAULT_SEGMENT_SIZE
) {
378 nsCOMPtr
<nsIInputStream
> reader
;
379 nsCOMPtr
<nsIOutputStream
> writer
;
381 uint32_t maxSize
= std::max(aNumBytes
, aSegmentSize
);
383 NS_NewPipe(getter_AddRefs(reader
), getter_AddRefs(writer
), aSegmentSize
,
386 nsTArray
<char> inputData
;
387 testing::CreateData(aNumBytes
, inputData
);
388 testing::WriteAllAndClose(writer
, inputData
);
389 testing::ConsumeAndValidateStream(reader
, inputData
);
394 TEST(Pipes
, Blocking_32k
)
395 { TestPipe2(32 * 1024); }
397 TEST(Pipes
, Blocking_64k
)
398 { TestPipe2(64 * 1024); }
400 TEST(Pipes
, Blocking_128k
)
401 { TestPipe2(128 * 1024); }
403 ////////////////////////////////////////////////////////////////////////////////
407 // Utility routine to validate pipe clone before. There are many knobs.
409 // aTotalBytes Total number of bytes to write to the pipe.
410 // aNumWrites How many separate write calls should be made. Bytes
411 // are evenly distributed over these write calls.
412 // aNumInitialClones How many clones of the pipe input stream should be
413 // made before writing begins.
414 // aNumToCloseAfterWrite How many streams should be closed after each write.
415 // One stream is always kept open. This verifies that
416 // closing one stream does not effect other open
418 // aNumToCloneAfterWrite How many clones to create after each write. Occurs
419 // after closing any streams. This tests cloning
420 // active streams on a pipe that is being written to.
421 // aNumStreamToReadPerWrite How many streams to read fully after each write.
422 // This tests reading cloned streams at different rates
423 // while the pipe is being written to.
424 static void TestPipeClone(uint32_t aTotalBytes
, uint32_t aNumWrites
,
425 uint32_t aNumInitialClones
,
426 uint32_t aNumToCloseAfterWrite
,
427 uint32_t aNumToCloneAfterWrite
,
428 uint32_t aNumStreamsToReadPerWrite
,
429 uint32_t aSegmentSize
= DEFAULT_SEGMENT_SIZE
) {
430 nsCOMPtr
<nsIInputStream
> reader
;
431 nsCOMPtr
<nsIOutputStream
> writer
;
433 uint32_t maxSize
= std::max(aTotalBytes
, aSegmentSize
);
435 // Use async input streams so we can NS_ConsumeStream() the current data
436 // while the pipe is still being written to.
437 NS_NewPipe(getter_AddRefs(reader
), getter_AddRefs(writer
), aSegmentSize
,
438 maxSize
, true, false); // non-blocking - reader, writer
440 nsCOMPtr
<nsICloneableInputStream
> cloneable
= do_QueryInterface(reader
);
441 ASSERT_TRUE(cloneable
);
442 ASSERT_TRUE(cloneable
->GetCloneable());
444 nsTArray
<nsCString
> outputDataList
;
446 nsTArray
<nsCOMPtr
<nsIInputStream
>> streamList
;
448 // first stream is our original reader from the pipe
449 streamList
.AppendElement(reader
);
450 outputDataList
.AppendElement();
452 // Clone the initial input stream the specified number of times
453 // before performing any writes.
455 for (uint32_t i
= 0; i
< aNumInitialClones
; ++i
) {
456 nsCOMPtr
<nsIInputStream
>* clone
= streamList
.AppendElement();
457 rv
= cloneable
->Clone(getter_AddRefs(*clone
));
458 ASSERT_NS_SUCCEEDED(rv
);
461 outputDataList
.AppendElement();
464 nsTArray
<char> inputData
;
465 testing::CreateData(aTotalBytes
, inputData
);
467 const uint32_t bytesPerWrite
= ((aTotalBytes
- 1) / aNumWrites
) + 1;
469 uint32_t remaining
= aTotalBytes
;
470 uint32_t nextStreamToRead
= 0;
473 uint32_t numToWrite
= std::min(bytesPerWrite
, remaining
);
474 testing::Write(writer
, inputData
, offset
, numToWrite
);
475 offset
+= numToWrite
;
476 remaining
-= numToWrite
;
478 // Close the specified number of streams. This allows us to
479 // test that one closed clone does not break other open clones.
480 for (uint32_t i
= 0; i
< aNumToCloseAfterWrite
&& streamList
.Length() > 1;
482 uint32_t lastIndex
= streamList
.Length() - 1;
483 streamList
[lastIndex
]->Close();
484 streamList
.RemoveElementAt(lastIndex
);
485 outputDataList
.RemoveElementAt(lastIndex
);
487 if (nextStreamToRead
>= streamList
.Length()) {
488 nextStreamToRead
= 0;
492 // Create the specified number of clones. This lets us verify
493 // that we can create clones in the middle of pipe reading and
495 for (uint32_t i
= 0; i
< aNumToCloneAfterWrite
; ++i
) {
496 nsCOMPtr
<nsIInputStream
>* clone
= streamList
.AppendElement();
497 rv
= cloneable
->Clone(getter_AddRefs(*clone
));
498 ASSERT_NS_SUCCEEDED(rv
);
501 // Initialize the new output data to make whats been read to data for
502 // the original stream. First stream is always the original stream.
503 nsCString
* outputData
= outputDataList
.AppendElement();
504 *outputData
= outputDataList
[0];
507 // Read the specified number of streams. This lets us verify that we
508 // can read from the clones at different rates while the pipe is being
510 for (uint32_t i
= 0; i
< aNumStreamsToReadPerWrite
; ++i
) {
511 nsCOMPtr
<nsIInputStream
>& stream
= streamList
[nextStreamToRead
];
512 nsCString
& outputData
= outputDataList
[nextStreamToRead
];
514 // Can't use ConsumeAndValidateStream() here because we're not
515 // guaranteed the exact amount read. It should just be at least
516 // as many as numToWrite.
517 nsAutoCString tmpOutputData
;
518 rv
= NS_ConsumeStream(stream
, UINT32_MAX
, tmpOutputData
);
519 ASSERT_TRUE(rv
== NS_BASE_STREAM_WOULD_BLOCK
|| NS_SUCCEEDED(rv
));
520 ASSERT_GE(tmpOutputData
.Length(), numToWrite
);
522 outputData
+= tmpOutputData
;
524 nextStreamToRead
+= 1;
525 if (nextStreamToRead
>= streamList
.Length()) {
526 // Note: When we wrap around on the streams being read, its possible
527 // we will trigger a segment to be deleted from the pipe. It
528 // would be nice to validate this here, but we don't have any
529 // QI'able interface that would let us check easily.
531 nextStreamToRead
= 0;
536 rv
= writer
->Close();
537 ASSERT_NS_SUCCEEDED(rv
);
539 nsDependentCSubstring
inputString(inputData
.Elements(), inputData
.Length());
541 // Finally, read the remaining bytes from each stream. This may be
542 // different amounts of data depending on how much reading we did while
543 // writing. Verify that the end result matches the input data.
544 for (uint32_t i
= 0; i
< streamList
.Length(); ++i
) {
545 nsCOMPtr
<nsIInputStream
>& stream
= streamList
[i
];
546 nsCString
& outputData
= outputDataList
[i
];
548 nsAutoCString tmpOutputData
;
549 rv
= NS_ConsumeStream(stream
, UINT32_MAX
, tmpOutputData
);
550 ASSERT_TRUE(rv
== NS_BASE_STREAM_WOULD_BLOCK
|| NS_SUCCEEDED(rv
));
553 // Append to total amount read from the stream
554 outputData
+= tmpOutputData
;
556 ASSERT_EQ(inputString
.Length(), outputData
.Length());
557 ASSERT_TRUE(inputString
.Equals(outputData
));
563 TEST(Pipes
, Clone_BeforeWrite_ReadAtEnd
)
565 TestPipeClone(32 * 1024, // total bytes
567 3, // num initial clones
568 0, // num streams to close after each write
569 0, // num clones to add after each write
570 0); // num streams to read after each write
573 TEST(Pipes
, Clone_BeforeWrite_ReadDuringWrite
)
575 // Since this reads all streams on every write, it should trigger the
576 // pipe cursor roll back optimization. Currently we can only verify
577 // this with logging.
579 TestPipeClone(32 * 1024, // total bytes
581 3, // num initial clones
582 0, // num streams to close after each write
583 0, // num clones to add after each write
584 4); // num streams to read after each write
587 TEST(Pipes
, Clone_DuringWrite_ReadAtEnd
)
589 TestPipeClone(32 * 1024, // total bytes
591 0, // num initial clones
592 0, // num streams to close after each write
593 1, // num clones to add after each write
594 0); // num streams to read after each write
597 TEST(Pipes
, Clone_DuringWrite_ReadDuringWrite
)
599 TestPipeClone(32 * 1024, // total bytes
601 0, // num initial clones
602 0, // num streams to close after each write
603 1, // num clones to add after each write
604 1); // num streams to read after each write
607 TEST(Pipes
, Clone_DuringWrite_ReadDuringWrite_CloseDuringWrite
)
609 // Since this reads streams faster than we clone new ones, it should
610 // trigger pipe segment deletion periodically. Currently we can
611 // only verify this with logging.
613 TestPipeClone(32 * 1024, // total bytes
615 1, // num initial clones
616 1, // num streams to close after each write
617 2, // num clones to add after each write
618 3); // num streams to read after each write
621 TEST(Pipes
, Write_AsyncWait
)
623 nsCOMPtr
<nsIAsyncInputStream
> reader
;
624 nsCOMPtr
<nsIAsyncOutputStream
> writer
;
626 const uint32_t segmentSize
= 1024;
627 const uint32_t numSegments
= 1;
629 NS_NewPipe2(getter_AddRefs(reader
), getter_AddRefs(writer
), true,
630 true, // non-blocking - reader, writer
631 segmentSize
, numSegments
);
633 nsTArray
<char> inputData
;
634 testing::CreateData(segmentSize
, inputData
);
636 uint32_t numWritten
= 0;
638 writer
->Write(inputData
.Elements(), inputData
.Length(), &numWritten
);
639 ASSERT_NS_SUCCEEDED(rv
);
641 rv
= writer
->Write(inputData
.Elements(), inputData
.Length(), &numWritten
);
642 ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK
, rv
);
644 RefPtr
<testing::OutputStreamCallback
> cb
=
645 new testing::OutputStreamCallback();
647 rv
= writer
->AsyncWait(cb
, 0, 0, nullptr);
648 ASSERT_NS_SUCCEEDED(rv
);
650 ASSERT_FALSE(cb
->Called());
652 testing::ConsumeAndValidateStream(reader
, inputData
);
654 ASSERT_TRUE(cb
->Called());
657 TEST(Pipes
, Write_AsyncWait_Clone
)
659 nsCOMPtr
<nsIAsyncInputStream
> reader
;
660 nsCOMPtr
<nsIAsyncOutputStream
> writer
;
662 const uint32_t segmentSize
= 1024;
663 const uint32_t numSegments
= 1;
665 NS_NewPipe2(getter_AddRefs(reader
), getter_AddRefs(writer
), true,
666 true, // non-blocking - reader, writer
667 segmentSize
, numSegments
);
669 nsCOMPtr
<nsIInputStream
> clone
;
670 nsresult rv
= NS_CloneInputStream(reader
, getter_AddRefs(clone
));
671 ASSERT_NS_SUCCEEDED(rv
);
673 nsTArray
<char> inputData
;
674 testing::CreateData(segmentSize
, inputData
);
676 uint32_t numWritten
= 0;
677 rv
= writer
->Write(inputData
.Elements(), inputData
.Length(), &numWritten
);
678 ASSERT_NS_SUCCEEDED(rv
);
680 // This attempts to write data beyond the original pipe size limit. It
681 // should fail since neither side of the clone has been read yet.
682 rv
= writer
->Write(inputData
.Elements(), inputData
.Length(), &numWritten
);
683 ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK
, rv
);
685 RefPtr
<testing::OutputStreamCallback
> cb
=
686 new testing::OutputStreamCallback();
688 rv
= writer
->AsyncWait(cb
, 0, 0, nullptr);
689 ASSERT_NS_SUCCEEDED(rv
);
691 ASSERT_FALSE(cb
->Called());
693 // Consume data on the original stream, but the clone still has not been read.
694 testing::ConsumeAndValidateStream(reader
, inputData
);
696 // A clone that is not being read should not stall the other input stream
697 // reader. Therefore the writer callback should trigger when the fastest
698 // reader drains the other input stream.
699 ASSERT_TRUE(cb
->Called());
701 // Attempt to write data. This will buffer data beyond the pipe size limit in
702 // order for the clone stream to still work. This is allowed because the
703 // other input stream has drained its buffered segments and is ready for more
705 rv
= writer
->Write(inputData
.Elements(), inputData
.Length(), &numWritten
);
706 ASSERT_NS_SUCCEEDED(rv
);
708 // Again, this should fail since the origin stream has not been read again.
709 // The pipe size should still restrict how far ahead we can buffer even
710 // when there is a cloned stream not being read.
711 rv
= writer
->Write(inputData
.Elements(), inputData
.Length(), &numWritten
);
712 ASSERT_NS_FAILED(rv
);
714 cb
= new testing::OutputStreamCallback();
715 rv
= writer
->AsyncWait(cb
, 0, 0, nullptr);
716 ASSERT_NS_SUCCEEDED(rv
);
718 // The write should again be blocked since we have written data and the
719 // main reader is at its maximum advance buffer.
720 ASSERT_FALSE(cb
->Called());
722 nsTArray
<char> expectedCloneData
;
723 expectedCloneData
.AppendElements(inputData
);
724 expectedCloneData
.AppendElements(inputData
);
726 // We should now be able to consume the entire backlog of buffered data on
727 // the cloned stream.
728 testing::ConsumeAndValidateStream(clone
, expectedCloneData
);
730 // Draining the clone side should also trigger the AsyncWait() writer
732 ASSERT_TRUE(cb
->Called());
734 // Finally, we should be able to consume the remaining data on the original
736 testing::ConsumeAndValidateStream(reader
, inputData
);
739 TEST(Pipes
, Write_AsyncWait_Clone_CloseOriginal
)
741 nsCOMPtr
<nsIAsyncInputStream
> reader
;
742 nsCOMPtr
<nsIAsyncOutputStream
> writer
;
744 const uint32_t segmentSize
= 1024;
745 const uint32_t numSegments
= 1;
747 NS_NewPipe2(getter_AddRefs(reader
), getter_AddRefs(writer
), true,
748 true, // non-blocking - reader, writer
749 segmentSize
, numSegments
);
751 nsCOMPtr
<nsIInputStream
> clone
;
752 nsresult rv
= NS_CloneInputStream(reader
, getter_AddRefs(clone
));
753 ASSERT_NS_SUCCEEDED(rv
);
755 nsTArray
<char> inputData
;
756 testing::CreateData(segmentSize
, inputData
);
758 uint32_t numWritten
= 0;
759 rv
= writer
->Write(inputData
.Elements(), inputData
.Length(), &numWritten
);
760 ASSERT_NS_SUCCEEDED(rv
);
762 // This attempts to write data beyond the original pipe size limit. It
763 // should fail since neither side of the clone has been read yet.
764 rv
= writer
->Write(inputData
.Elements(), inputData
.Length(), &numWritten
);
765 ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK
, rv
);
767 RefPtr
<testing::OutputStreamCallback
> cb
=
768 new testing::OutputStreamCallback();
770 rv
= writer
->AsyncWait(cb
, 0, 0, nullptr);
771 ASSERT_NS_SUCCEEDED(rv
);
773 ASSERT_FALSE(cb
->Called());
775 // Consume data on the original stream, but the clone still has not been read.
776 testing::ConsumeAndValidateStream(reader
, inputData
);
778 // A clone that is not being read should not stall the other input stream
779 // reader. Therefore the writer callback should trigger when the fastest
780 // reader drains the other input stream.
781 ASSERT_TRUE(cb
->Called());
783 // Attempt to write data. This will buffer data beyond the pipe size limit in
784 // order for the clone stream to still work. This is allowed because the
785 // other input stream has drained its buffered segments and is ready for more
787 rv
= writer
->Write(inputData
.Elements(), inputData
.Length(), &numWritten
);
788 ASSERT_NS_SUCCEEDED(rv
);
790 // Again, this should fail since the origin stream has not been read again.
791 // The pipe size should still restrict how far ahead we can buffer even
792 // when there is a cloned stream not being read.
793 rv
= writer
->Write(inputData
.Elements(), inputData
.Length(), &numWritten
);
794 ASSERT_NS_FAILED(rv
);
796 cb
= new testing::OutputStreamCallback();
797 rv
= writer
->AsyncWait(cb
, 0, 0, nullptr);
798 ASSERT_NS_SUCCEEDED(rv
);
800 // The write should again be blocked since we have written data and the
801 // main reader is at its maximum advance buffer.
802 ASSERT_FALSE(cb
->Called());
804 // Close the original reader input stream. This was the fastest reader,
805 // so we should have a single stream that is buffered beyond our nominal
809 // Because the clone stream is still buffered the writable callback should
811 ASSERT_FALSE(cb
->Called());
813 // And we should not be able to perform a write.
814 rv
= writer
->Write(inputData
.Elements(), inputData
.Length(), &numWritten
);
815 ASSERT_NS_FAILED(rv
);
817 // Create another clone stream. Now we have two streams that exceed our
818 // maximum size limit
819 nsCOMPtr
<nsIInputStream
> clone2
;
820 rv
= NS_CloneInputStream(clone
, getter_AddRefs(clone2
));
821 ASSERT_NS_SUCCEEDED(rv
);
823 nsTArray
<char> expectedCloneData
;
824 expectedCloneData
.AppendElements(inputData
);
825 expectedCloneData
.AppendElements(inputData
);
827 // We should now be able to consume the entire backlog of buffered data on
828 // the cloned stream.
829 testing::ConsumeAndValidateStream(clone
, expectedCloneData
);
831 // The pipe should now be writable because we have two open streams, one of
832 // which is completely drained.
833 ASSERT_TRUE(cb
->Called());
835 // Write again to reach our limit again.
836 rv
= writer
->Write(inputData
.Elements(), inputData
.Length(), &numWritten
);
837 ASSERT_NS_SUCCEEDED(rv
);
839 // The stream is again non-writeable.
840 cb
= new testing::OutputStreamCallback();
841 rv
= writer
->AsyncWait(cb
, 0, 0, nullptr);
842 ASSERT_NS_SUCCEEDED(rv
);
843 ASSERT_FALSE(cb
->Called());
845 // Close the empty stream. This is different from our previous close since
846 // before we were closing a stream with some data still buffered.
849 // The pipe should not be writable. The second clone is still fully buffered
851 ASSERT_FALSE(cb
->Called());
852 rv
= writer
->Write(inputData
.Elements(), inputData
.Length(), &numWritten
);
853 ASSERT_NS_FAILED(rv
);
855 // Finally consume all of the buffered data on the second clone.
856 expectedCloneData
.AppendElements(inputData
);
857 testing::ConsumeAndValidateStream(clone2
, expectedCloneData
);
859 // Draining the final clone should make the pipe writable again.
860 ASSERT_TRUE(cb
->Called());
863 TEST(Pipes
, Read_AsyncWait
)
865 nsCOMPtr
<nsIAsyncInputStream
> reader
;
866 nsCOMPtr
<nsIAsyncOutputStream
> writer
;
868 const uint32_t segmentSize
= 1024;
869 const uint32_t numSegments
= 1;
871 NS_NewPipe2(getter_AddRefs(reader
), getter_AddRefs(writer
), true,
872 true, // non-blocking - reader, writer
873 segmentSize
, numSegments
);
875 nsTArray
<char> inputData
;
876 testing::CreateData(segmentSize
, inputData
);
878 RefPtr
<testing::InputStreamCallback
> cb
= new testing::InputStreamCallback();
880 nsresult rv
= reader
->AsyncWait(cb
, 0, 0, nullptr);
881 ASSERT_NS_SUCCEEDED(rv
);
883 ASSERT_FALSE(cb
->Called());
885 uint32_t numWritten
= 0;
886 rv
= writer
->Write(inputData
.Elements(), inputData
.Length(), &numWritten
);
887 ASSERT_NS_SUCCEEDED(rv
);
889 ASSERT_TRUE(cb
->Called());
891 testing::ConsumeAndValidateStream(reader
, inputData
);
894 TEST(Pipes
, Read_AsyncWait_Clone
)
896 nsCOMPtr
<nsIAsyncInputStream
> reader
;
897 nsCOMPtr
<nsIAsyncOutputStream
> writer
;
899 const uint32_t segmentSize
= 1024;
900 const uint32_t numSegments
= 1;
902 NS_NewPipe2(getter_AddRefs(reader
), getter_AddRefs(writer
), true,
903 true, // non-blocking - reader, writer
904 segmentSize
, numSegments
);
906 nsCOMPtr
<nsIInputStream
> clone
;
907 nsresult rv
= NS_CloneInputStream(reader
, getter_AddRefs(clone
));
908 ASSERT_NS_SUCCEEDED(rv
);
910 nsCOMPtr
<nsIAsyncInputStream
> asyncClone
= do_QueryInterface(clone
);
911 ASSERT_TRUE(asyncClone
);
913 nsTArray
<char> inputData
;
914 testing::CreateData(segmentSize
, inputData
);
916 RefPtr
<testing::InputStreamCallback
> cb
= new testing::InputStreamCallback();
918 RefPtr
<testing::InputStreamCallback
> cb2
= new testing::InputStreamCallback();
920 rv
= reader
->AsyncWait(cb
, 0, 0, nullptr);
921 ASSERT_NS_SUCCEEDED(rv
);
923 ASSERT_FALSE(cb
->Called());
925 rv
= asyncClone
->AsyncWait(cb2
, 0, 0, nullptr);
926 ASSERT_NS_SUCCEEDED(rv
);
928 ASSERT_FALSE(cb2
->Called());
930 uint32_t numWritten
= 0;
931 rv
= writer
->Write(inputData
.Elements(), inputData
.Length(), &numWritten
);
932 ASSERT_NS_SUCCEEDED(rv
);
934 ASSERT_TRUE(cb
->Called());
935 ASSERT_TRUE(cb2
->Called());
937 testing::ConsumeAndValidateStream(reader
, inputData
);
942 nsresult
CloseDuringReadFunc(nsIInputStream
* aReader
, void* aClosure
,
943 const char* aFromSegment
, uint32_t aToOffset
,
944 uint32_t aCount
, uint32_t* aWriteCountOut
) {
945 MOZ_RELEASE_ASSERT(aReader
);
946 MOZ_RELEASE_ASSERT(aClosure
);
947 MOZ_RELEASE_ASSERT(aFromSegment
);
948 MOZ_RELEASE_ASSERT(aWriteCountOut
);
949 MOZ_RELEASE_ASSERT(aToOffset
== 0);
951 // This is insanity and you probably should not do this under normal
952 // conditions. We want to simulate the case where the pipe is closed
953 // (possibly from other end on another thread) simultaneously with the
954 // read. This is the easiest way to do trigger this case in a synchronous
956 MOZ_ALWAYS_SUCCEEDS(aReader
->Close());
958 nsTArray
<char>* buffer
= static_cast<nsTArray
<char>*>(aClosure
);
959 buffer
->AppendElements(aFromSegment
, aCount
);
961 *aWriteCountOut
= aCount
;
966 void TestCloseDuringRead(uint32_t aSegmentSize
, uint32_t aDataSize
) {
967 nsCOMPtr
<nsIInputStream
> reader
;
968 nsCOMPtr
<nsIOutputStream
> writer
;
970 const uint32_t maxSize
= aSegmentSize
;
972 NS_NewPipe(getter_AddRefs(reader
), getter_AddRefs(writer
), aSegmentSize
,
975 nsTArray
<char> inputData
;
977 testing::CreateData(aDataSize
, inputData
);
979 uint32_t numWritten
= 0;
981 writer
->Write(inputData
.Elements(), inputData
.Length(), &numWritten
);
982 ASSERT_NS_SUCCEEDED(rv
);
984 nsTArray
<char> outputData
;
986 uint32_t numRead
= 0;
987 rv
= reader
->ReadSegments(CloseDuringReadFunc
, &outputData
,
988 inputData
.Length(), &numRead
);
989 ASSERT_NS_SUCCEEDED(rv
);
990 ASSERT_EQ(inputData
.Length(), numRead
);
992 ASSERT_EQ(inputData
, outputData
);
995 rv
= reader
->Available(&available
);
996 ASSERT_EQ(NS_BASE_STREAM_CLOSED
, rv
);
1001 TEST(Pipes
, Close_During_Read_Partial_Segment
)
1002 { TestCloseDuringRead(1024, 512); }
1004 TEST(Pipes
, Close_During_Read_Full_Segment
)
1005 { TestCloseDuringRead(1024, 1024); }
1007 TEST(Pipes
, Interfaces
)
1009 nsCOMPtr
<nsIInputStream
> reader
;
1010 nsCOMPtr
<nsIOutputStream
> writer
;
1012 NS_NewPipe(getter_AddRefs(reader
), getter_AddRefs(writer
));
1014 nsCOMPtr
<nsIAsyncInputStream
> readerType1
= do_QueryInterface(reader
);
1015 ASSERT_TRUE(readerType1
);
1017 nsCOMPtr
<nsITellableStream
> readerType2
= do_QueryInterface(reader
);
1018 ASSERT_TRUE(readerType2
);
1020 nsCOMPtr
<nsISearchableInputStream
> readerType3
= do_QueryInterface(reader
);
1021 ASSERT_TRUE(readerType3
);
1023 nsCOMPtr
<nsICloneableInputStream
> readerType4
= do_QueryInterface(reader
);
1024 ASSERT_TRUE(readerType4
);
1026 nsCOMPtr
<nsIClassInfo
> readerType5
= do_QueryInterface(reader
);
1027 ASSERT_TRUE(readerType5
);
1029 nsCOMPtr
<nsIBufferedInputStream
> readerType6
= do_QueryInterface(reader
);
1030 ASSERT_TRUE(readerType6
);