Bug 1883861 - Part 1: Move visitMemoryBarrier into the common CodeGenerator file...
[gecko.git] / xpcom / tests / gtest / TestPipes.cpp
bloba4f0ebc7a5c1affa2989554e63b8044552cbf9d0
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include <algorithm>
8 #include "gtest/gtest.h"
9 #include "Helpers.h"
10 #include "mozilla/gtest/MozAssertions.h"
11 #include "mozilla/ReentrantMonitor.h"
12 #include "mozilla/Printf.h"
13 #include "nsCOMPtr.h"
14 #include "nsCRT.h"
15 #include "nsIAsyncInputStream.h"
16 #include "nsIAsyncOutputStream.h"
17 #include "nsIBufferedStreams.h"
18 #include "nsIClassInfo.h"
19 #include "nsICloneableInputStream.h"
20 #include "nsIInputStream.h"
21 #include "nsIOutputStream.h"
22 #include "nsIPipe.h"
23 #include "nsITellableStream.h"
24 #include "nsIThread.h"
25 #include "nsIRunnable.h"
26 #include "nsStreamUtils.h"
27 #include "nsString.h"
28 #include "nsThreadUtils.h"
29 #include "prinrval.h"
31 using namespace mozilla;
33 #define ITERATIONS 33333
34 char kTestPattern[] = "My hovercraft is full of eels.\n";
36 bool gTrace = false;
38 static nsresult WriteAll(nsIOutputStream* os, const char* buf, uint32_t bufLen,
39 uint32_t* lenWritten) {
40 const char* p = buf;
41 *lenWritten = 0;
42 while (bufLen) {
43 uint32_t n;
44 nsresult rv = os->Write(p, bufLen, &n);
45 if (NS_FAILED(rv)) return rv;
46 p += n;
47 bufLen -= n;
48 *lenWritten += n;
50 return NS_OK;
53 class nsReceiver final : public Runnable {
54 public:
55 NS_IMETHOD Run() override {
56 nsresult rv;
57 char buf[101];
58 uint32_t count;
59 PRIntervalTime start = PR_IntervalNow();
60 while (true) {
61 rv = mIn->Read(buf, 100, &count);
62 if (NS_FAILED(rv)) {
63 printf("read failed\n");
64 break;
66 if (count == 0) {
67 // printf("EOF count = %d\n", mCount);
68 break;
71 if (gTrace) {
72 buf[count] = '\0';
73 printf("read: %s\n", buf);
75 mCount += count;
77 PRIntervalTime end = PR_IntervalNow();
78 printf("read %d bytes, time = %dms\n", mCount,
79 PR_IntervalToMilliseconds(end - start));
80 return rv;
83 explicit nsReceiver(nsIInputStream* in)
84 : Runnable("nsReceiver"), mIn(in), mCount(0) {}
86 uint32_t GetBytesRead() { return mCount; }
88 private:
89 ~nsReceiver() = default;
91 protected:
92 nsCOMPtr<nsIInputStream> mIn;
93 uint32_t mCount;
96 static nsresult TestPipe(nsIInputStream* in, nsIOutputStream* out) {
97 RefPtr<nsReceiver> receiver = new nsReceiver(in);
98 nsresult rv;
100 nsCOMPtr<nsIThread> thread;
101 rv = NS_NewNamedThread("TestPipe", getter_AddRefs(thread), receiver);
102 if (NS_FAILED(rv)) return rv;
104 uint32_t total = 0;
105 PRIntervalTime start = PR_IntervalNow();
106 for (uint32_t i = 0; i < ITERATIONS; i++) {
107 uint32_t writeCount;
108 SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
109 uint32_t len = strlen(buf.get());
110 rv = WriteAll(out, buf.get(), len, &writeCount);
111 if (gTrace) {
112 printf("wrote: ");
113 for (uint32_t j = 0; j < writeCount; j++) {
114 putc(buf.get()[j], stdout);
116 printf("\n");
118 if (NS_FAILED(rv)) return rv;
119 total += writeCount;
121 rv = out->Close();
122 if (NS_FAILED(rv)) return rv;
124 PRIntervalTime end = PR_IntervalNow();
126 thread->Shutdown();
128 printf("wrote %d bytes, time = %dms\n", total,
129 PR_IntervalToMilliseconds(end - start));
130 EXPECT_EQ(receiver->GetBytesRead(), total);
132 return NS_OK;
135 ////////////////////////////////////////////////////////////////////////////////
137 class nsShortReader final : public Runnable {
138 public:
139 NS_IMETHOD Run() override {
140 nsresult rv;
141 char buf[101];
142 uint32_t count;
143 uint32_t total = 0;
144 while (true) {
145 // if (gTrace)
146 // printf("calling Read\n");
147 rv = mIn->Read(buf, 100, &count);
148 if (NS_FAILED(rv)) {
149 printf("read failed\n");
150 break;
152 if (count == 0) {
153 break;
156 if (gTrace) {
157 // For next |printf()| call and possible others elsewhere.
158 buf[count] = '\0';
160 printf("read %d bytes: %s\n", count, buf);
163 Received(count);
164 total += count;
166 printf("read %d bytes\n", total);
167 return rv;
170 explicit nsShortReader(nsIInputStream* in)
171 : Runnable("nsShortReader"), mIn(in), mReceived(0) {
172 mMon = new ReentrantMonitor("nsShortReader");
175 void Received(uint32_t count) {
176 ReentrantMonitorAutoEnter mon(*mMon);
177 mReceived += count;
178 mon.Notify();
181 uint32_t WaitForReceipt(const uint32_t aWriteCount) {
182 ReentrantMonitorAutoEnter mon(*mMon);
183 uint32_t result = mReceived;
185 while (result < aWriteCount) {
186 mon.Wait();
188 EXPECT_TRUE(mReceived > result);
189 result = mReceived;
192 mReceived = 0;
193 return result;
196 private:
197 ~nsShortReader() = default;
199 protected:
200 nsCOMPtr<nsIInputStream> mIn;
201 uint32_t mReceived;
202 ReentrantMonitor* mMon;
205 static nsresult TestShortWrites(nsIInputStream* in, nsIOutputStream* out) {
206 RefPtr<nsShortReader> receiver = new nsShortReader(in);
207 nsresult rv;
209 nsCOMPtr<nsIThread> thread;
210 rv = NS_NewNamedThread("TestShortWrites", getter_AddRefs(thread), receiver);
211 if (NS_FAILED(rv)) return rv;
213 uint32_t total = 0;
214 for (uint32_t i = 0; i < ITERATIONS; i++) {
215 uint32_t writeCount;
216 SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
217 uint32_t len = strlen(buf.get());
218 len = len * rand() / RAND_MAX;
219 len = std::min(1u, len);
220 rv = WriteAll(out, buf.get(), len, &writeCount);
221 if (NS_FAILED(rv)) return rv;
222 EXPECT_EQ(writeCount, len);
223 total += writeCount;
225 if (gTrace) printf("wrote %d bytes: %s\n", writeCount, buf.get());
226 // printf("calling Flush\n");
227 out->Flush();
228 // printf("calling WaitForReceipt\n");
230 #ifdef DEBUG
231 const uint32_t received = receiver->WaitForReceipt(writeCount);
232 EXPECT_EQ(received, writeCount);
233 #endif
235 rv = out->Close();
236 if (NS_FAILED(rv)) return rv;
238 thread->Shutdown();
240 printf("wrote %d bytes\n", total);
242 return NS_OK;
245 ////////////////////////////////////////////////////////////////////////////////
247 class nsPump final : public Runnable {
248 public:
249 NS_IMETHOD Run() override {
250 nsresult rv;
251 uint32_t count;
252 while (true) {
253 rv = mOut->WriteFrom(mIn, ~0U, &count);
254 if (NS_FAILED(rv)) {
255 printf("Write failed\n");
256 break;
258 if (count == 0) {
259 printf("EOF count = %d\n", mCount);
260 break;
263 if (gTrace) {
264 printf("Wrote: %d\n", count);
266 mCount += count;
268 mOut->Close();
269 return rv;
272 nsPump(nsIInputStream* in, nsIOutputStream* out)
273 : Runnable("nsPump"), mIn(in), mOut(out), mCount(0) {}
275 private:
276 ~nsPump() = default;
278 protected:
279 nsCOMPtr<nsIInputStream> mIn;
280 nsCOMPtr<nsIOutputStream> mOut;
281 uint32_t mCount;
284 TEST(Pipes, ChainedPipes)
286 nsresult rv;
287 if (gTrace) {
288 printf("TestChainedPipes\n");
291 nsCOMPtr<nsIInputStream> in1;
292 nsCOMPtr<nsIOutputStream> out1;
293 NS_NewPipe(getter_AddRefs(in1), getter_AddRefs(out1), 20, 1999);
295 nsCOMPtr<nsIInputStream> in2;
296 nsCOMPtr<nsIOutputStream> out2;
297 NS_NewPipe(getter_AddRefs(in2), getter_AddRefs(out2), 200, 401);
299 RefPtr<nsPump> pump = new nsPump(in1, out2);
300 if (pump == nullptr) return;
302 nsCOMPtr<nsIThread> thread;
303 rv = NS_NewNamedThread("ChainedPipePump", getter_AddRefs(thread), pump);
304 if (NS_FAILED(rv)) return;
306 RefPtr<nsReceiver> receiver = new nsReceiver(in2);
307 if (receiver == nullptr) return;
309 nsCOMPtr<nsIThread> receiverThread;
310 rv = NS_NewNamedThread("ChainedPipeRecv", getter_AddRefs(receiverThread),
311 receiver);
312 if (NS_FAILED(rv)) return;
314 uint32_t total = 0;
315 for (uint32_t i = 0; i < ITERATIONS; i++) {
316 uint32_t writeCount;
317 SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
318 uint32_t len = strlen(buf.get());
319 len = len * rand() / RAND_MAX;
320 len = std::max(1u, len);
321 rv = WriteAll(out1, buf.get(), len, &writeCount);
322 if (NS_FAILED(rv)) return;
323 EXPECT_EQ(writeCount, len);
324 total += writeCount;
326 if (gTrace) printf("wrote %d bytes: %s\n", writeCount, buf.get());
328 if (gTrace) {
329 printf("wrote total of %d bytes\n", total);
331 rv = out1->Close();
332 if (NS_FAILED(rv)) return;
334 thread->Shutdown();
335 receiverThread->Shutdown();
338 ////////////////////////////////////////////////////////////////////////////////
340 static void RunTests(uint32_t segSize, uint32_t segCount) {
341 nsresult rv;
342 nsCOMPtr<nsIInputStream> in;
343 nsCOMPtr<nsIOutputStream> out;
344 uint32_t bufSize = segSize * segCount;
345 if (gTrace) {
346 printf("Testing New Pipes: segment size %d buffer size %d\n", segSize,
347 bufSize);
348 printf("Testing long writes...\n");
350 NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize);
351 rv = TestPipe(in, out);
352 EXPECT_NS_SUCCEEDED(rv);
354 if (gTrace) {
355 printf("Testing short writes...\n");
357 NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize);
358 rv = TestShortWrites(in, out);
359 EXPECT_NS_SUCCEEDED(rv);
362 TEST(Pipes, Main)
364 RunTests(16, 1);
365 RunTests(4096, 16);
368 ////////////////////////////////////////////////////////////////////////////////
370 namespace {
372 static const uint32_t DEFAULT_SEGMENT_SIZE = 4 * 1024;
374 // An alternate pipe testing routing that uses NS_ConsumeStream() instead of
375 // manual read loop.
376 static void TestPipe2(uint32_t aNumBytes,
377 uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE) {
378 nsCOMPtr<nsIInputStream> reader;
379 nsCOMPtr<nsIOutputStream> writer;
381 uint32_t maxSize = std::max(aNumBytes, aSegmentSize);
383 NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), aSegmentSize,
384 maxSize);
386 nsTArray<char> inputData;
387 testing::CreateData(aNumBytes, inputData);
388 testing::WriteAllAndClose(writer, inputData);
389 testing::ConsumeAndValidateStream(reader, inputData);
392 } // namespace
394 TEST(Pipes, Blocking_32k)
395 { TestPipe2(32 * 1024); }
397 TEST(Pipes, Blocking_64k)
398 { TestPipe2(64 * 1024); }
400 TEST(Pipes, Blocking_128k)
401 { TestPipe2(128 * 1024); }
403 ////////////////////////////////////////////////////////////////////////////////
405 namespace {
407 // Utility routine to validate pipe clone before. There are many knobs.
409 // aTotalBytes Total number of bytes to write to the pipe.
410 // aNumWrites How many separate write calls should be made. Bytes
411 // are evenly distributed over these write calls.
412 // aNumInitialClones How many clones of the pipe input stream should be
413 // made before writing begins.
414 // aNumToCloseAfterWrite How many streams should be closed after each write.
415 // One stream is always kept open. This verifies that
416 // closing one stream does not effect other open
417 // streams.
418 // aNumToCloneAfterWrite How many clones to create after each write. Occurs
419 // after closing any streams. This tests cloning
420 // active streams on a pipe that is being written to.
421 // aNumStreamToReadPerWrite How many streams to read fully after each write.
422 // This tests reading cloned streams at different rates
423 // while the pipe is being written to.
424 static void TestPipeClone(uint32_t aTotalBytes, uint32_t aNumWrites,
425 uint32_t aNumInitialClones,
426 uint32_t aNumToCloseAfterWrite,
427 uint32_t aNumToCloneAfterWrite,
428 uint32_t aNumStreamsToReadPerWrite,
429 uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE) {
430 nsCOMPtr<nsIInputStream> reader;
431 nsCOMPtr<nsIOutputStream> writer;
433 uint32_t maxSize = std::max(aTotalBytes, aSegmentSize);
435 // Use async input streams so we can NS_ConsumeStream() the current data
436 // while the pipe is still being written to.
437 NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), aSegmentSize,
438 maxSize, true, false); // non-blocking - reader, writer
440 nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(reader);
441 ASSERT_TRUE(cloneable);
442 ASSERT_TRUE(cloneable->GetCloneable());
444 nsTArray<nsCString> outputDataList;
446 nsTArray<nsCOMPtr<nsIInputStream>> streamList;
448 // first stream is our original reader from the pipe
449 streamList.AppendElement(reader);
450 outputDataList.AppendElement();
452 // Clone the initial input stream the specified number of times
453 // before performing any writes.
454 nsresult rv;
455 for (uint32_t i = 0; i < aNumInitialClones; ++i) {
456 nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
457 rv = cloneable->Clone(getter_AddRefs(*clone));
458 ASSERT_NS_SUCCEEDED(rv);
459 ASSERT_TRUE(*clone);
461 outputDataList.AppendElement();
464 nsTArray<char> inputData;
465 testing::CreateData(aTotalBytes, inputData);
467 const uint32_t bytesPerWrite = ((aTotalBytes - 1) / aNumWrites) + 1;
468 uint32_t offset = 0;
469 uint32_t remaining = aTotalBytes;
470 uint32_t nextStreamToRead = 0;
472 while (remaining) {
473 uint32_t numToWrite = std::min(bytesPerWrite, remaining);
474 testing::Write(writer, inputData, offset, numToWrite);
475 offset += numToWrite;
476 remaining -= numToWrite;
478 // Close the specified number of streams. This allows us to
479 // test that one closed clone does not break other open clones.
480 for (uint32_t i = 0; i < aNumToCloseAfterWrite && streamList.Length() > 1;
481 ++i) {
482 uint32_t lastIndex = streamList.Length() - 1;
483 streamList[lastIndex]->Close();
484 streamList.RemoveElementAt(lastIndex);
485 outputDataList.RemoveElementAt(lastIndex);
487 if (nextStreamToRead >= streamList.Length()) {
488 nextStreamToRead = 0;
492 // Create the specified number of clones. This lets us verify
493 // that we can create clones in the middle of pipe reading and
494 // writing.
495 for (uint32_t i = 0; i < aNumToCloneAfterWrite; ++i) {
496 nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
497 rv = cloneable->Clone(getter_AddRefs(*clone));
498 ASSERT_NS_SUCCEEDED(rv);
499 ASSERT_TRUE(*clone);
501 // Initialize the new output data to make whats been read to data for
502 // the original stream. First stream is always the original stream.
503 nsCString* outputData = outputDataList.AppendElement();
504 *outputData = outputDataList[0];
507 // Read the specified number of streams. This lets us verify that we
508 // can read from the clones at different rates while the pipe is being
509 // written to.
510 for (uint32_t i = 0; i < aNumStreamsToReadPerWrite; ++i) {
511 nsCOMPtr<nsIInputStream>& stream = streamList[nextStreamToRead];
512 nsCString& outputData = outputDataList[nextStreamToRead];
514 // Can't use ConsumeAndValidateStream() here because we're not
515 // guaranteed the exact amount read. It should just be at least
516 // as many as numToWrite.
517 nsAutoCString tmpOutputData;
518 rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
519 ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
520 ASSERT_GE(tmpOutputData.Length(), numToWrite);
522 outputData += tmpOutputData;
524 nextStreamToRead += 1;
525 if (nextStreamToRead >= streamList.Length()) {
526 // Note: When we wrap around on the streams being read, its possible
527 // we will trigger a segment to be deleted from the pipe. It
528 // would be nice to validate this here, but we don't have any
529 // QI'able interface that would let us check easily.
531 nextStreamToRead = 0;
536 rv = writer->Close();
537 ASSERT_NS_SUCCEEDED(rv);
539 nsDependentCSubstring inputString(inputData.Elements(), inputData.Length());
541 // Finally, read the remaining bytes from each stream. This may be
542 // different amounts of data depending on how much reading we did while
543 // writing. Verify that the end result matches the input data.
544 for (uint32_t i = 0; i < streamList.Length(); ++i) {
545 nsCOMPtr<nsIInputStream>& stream = streamList[i];
546 nsCString& outputData = outputDataList[i];
548 nsAutoCString tmpOutputData;
549 rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
550 ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
551 stream->Close();
553 // Append to total amount read from the stream
554 outputData += tmpOutputData;
556 ASSERT_EQ(inputString.Length(), outputData.Length());
557 ASSERT_TRUE(inputString.Equals(outputData));
561 } // namespace
563 TEST(Pipes, Clone_BeforeWrite_ReadAtEnd)
565 TestPipeClone(32 * 1024, // total bytes
566 16, // num writes
567 3, // num initial clones
568 0, // num streams to close after each write
569 0, // num clones to add after each write
570 0); // num streams to read after each write
573 TEST(Pipes, Clone_BeforeWrite_ReadDuringWrite)
575 // Since this reads all streams on every write, it should trigger the
576 // pipe cursor roll back optimization. Currently we can only verify
577 // this with logging.
579 TestPipeClone(32 * 1024, // total bytes
580 16, // num writes
581 3, // num initial clones
582 0, // num streams to close after each write
583 0, // num clones to add after each write
584 4); // num streams to read after each write
587 TEST(Pipes, Clone_DuringWrite_ReadAtEnd)
589 TestPipeClone(32 * 1024, // total bytes
590 16, // num writes
591 0, // num initial clones
592 0, // num streams to close after each write
593 1, // num clones to add after each write
594 0); // num streams to read after each write
597 TEST(Pipes, Clone_DuringWrite_ReadDuringWrite)
599 TestPipeClone(32 * 1024, // total bytes
600 16, // num writes
601 0, // num initial clones
602 0, // num streams to close after each write
603 1, // num clones to add after each write
604 1); // num streams to read after each write
607 TEST(Pipes, Clone_DuringWrite_ReadDuringWrite_CloseDuringWrite)
609 // Since this reads streams faster than we clone new ones, it should
610 // trigger pipe segment deletion periodically. Currently we can
611 // only verify this with logging.
613 TestPipeClone(32 * 1024, // total bytes
614 16, // num writes
615 1, // num initial clones
616 1, // num streams to close after each write
617 2, // num clones to add after each write
618 3); // num streams to read after each write
621 TEST(Pipes, Write_AsyncWait)
623 nsCOMPtr<nsIAsyncInputStream> reader;
624 nsCOMPtr<nsIAsyncOutputStream> writer;
626 const uint32_t segmentSize = 1024;
627 const uint32_t numSegments = 1;
629 NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer), true,
630 true, // non-blocking - reader, writer
631 segmentSize, numSegments);
633 nsTArray<char> inputData;
634 testing::CreateData(segmentSize, inputData);
636 uint32_t numWritten = 0;
637 nsresult rv =
638 writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
639 ASSERT_NS_SUCCEEDED(rv);
641 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
642 ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
644 RefPtr<testing::OutputStreamCallback> cb =
645 new testing::OutputStreamCallback();
647 rv = writer->AsyncWait(cb, 0, 0, nullptr);
648 ASSERT_NS_SUCCEEDED(rv);
650 ASSERT_FALSE(cb->Called());
652 testing::ConsumeAndValidateStream(reader, inputData);
654 ASSERT_TRUE(cb->Called());
657 TEST(Pipes, Write_AsyncWait_Clone)
659 nsCOMPtr<nsIAsyncInputStream> reader;
660 nsCOMPtr<nsIAsyncOutputStream> writer;
662 const uint32_t segmentSize = 1024;
663 const uint32_t numSegments = 1;
665 NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer), true,
666 true, // non-blocking - reader, writer
667 segmentSize, numSegments);
669 nsCOMPtr<nsIInputStream> clone;
670 nsresult rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
671 ASSERT_NS_SUCCEEDED(rv);
673 nsTArray<char> inputData;
674 testing::CreateData(segmentSize, inputData);
676 uint32_t numWritten = 0;
677 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
678 ASSERT_NS_SUCCEEDED(rv);
680 // This attempts to write data beyond the original pipe size limit. It
681 // should fail since neither side of the clone has been read yet.
682 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
683 ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
685 RefPtr<testing::OutputStreamCallback> cb =
686 new testing::OutputStreamCallback();
688 rv = writer->AsyncWait(cb, 0, 0, nullptr);
689 ASSERT_NS_SUCCEEDED(rv);
691 ASSERT_FALSE(cb->Called());
693 // Consume data on the original stream, but the clone still has not been read.
694 testing::ConsumeAndValidateStream(reader, inputData);
696 // A clone that is not being read should not stall the other input stream
697 // reader. Therefore the writer callback should trigger when the fastest
698 // reader drains the other input stream.
699 ASSERT_TRUE(cb->Called());
701 // Attempt to write data. This will buffer data beyond the pipe size limit in
702 // order for the clone stream to still work. This is allowed because the
703 // other input stream has drained its buffered segments and is ready for more
704 // data.
705 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
706 ASSERT_NS_SUCCEEDED(rv);
708 // Again, this should fail since the origin stream has not been read again.
709 // The pipe size should still restrict how far ahead we can buffer even
710 // when there is a cloned stream not being read.
711 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
712 ASSERT_NS_FAILED(rv);
714 cb = new testing::OutputStreamCallback();
715 rv = writer->AsyncWait(cb, 0, 0, nullptr);
716 ASSERT_NS_SUCCEEDED(rv);
718 // The write should again be blocked since we have written data and the
719 // main reader is at its maximum advance buffer.
720 ASSERT_FALSE(cb->Called());
722 nsTArray<char> expectedCloneData;
723 expectedCloneData.AppendElements(inputData);
724 expectedCloneData.AppendElements(inputData);
726 // We should now be able to consume the entire backlog of buffered data on
727 // the cloned stream.
728 testing::ConsumeAndValidateStream(clone, expectedCloneData);
730 // Draining the clone side should also trigger the AsyncWait() writer
731 // callback
732 ASSERT_TRUE(cb->Called());
734 // Finally, we should be able to consume the remaining data on the original
735 // reader.
736 testing::ConsumeAndValidateStream(reader, inputData);
739 TEST(Pipes, Write_AsyncWait_Clone_CloseOriginal)
741 nsCOMPtr<nsIAsyncInputStream> reader;
742 nsCOMPtr<nsIAsyncOutputStream> writer;
744 const uint32_t segmentSize = 1024;
745 const uint32_t numSegments = 1;
747 NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer), true,
748 true, // non-blocking - reader, writer
749 segmentSize, numSegments);
751 nsCOMPtr<nsIInputStream> clone;
752 nsresult rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
753 ASSERT_NS_SUCCEEDED(rv);
755 nsTArray<char> inputData;
756 testing::CreateData(segmentSize, inputData);
758 uint32_t numWritten = 0;
759 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
760 ASSERT_NS_SUCCEEDED(rv);
762 // This attempts to write data beyond the original pipe size limit. It
763 // should fail since neither side of the clone has been read yet.
764 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
765 ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
767 RefPtr<testing::OutputStreamCallback> cb =
768 new testing::OutputStreamCallback();
770 rv = writer->AsyncWait(cb, 0, 0, nullptr);
771 ASSERT_NS_SUCCEEDED(rv);
773 ASSERT_FALSE(cb->Called());
775 // Consume data on the original stream, but the clone still has not been read.
776 testing::ConsumeAndValidateStream(reader, inputData);
778 // A clone that is not being read should not stall the other input stream
779 // reader. Therefore the writer callback should trigger when the fastest
780 // reader drains the other input stream.
781 ASSERT_TRUE(cb->Called());
783 // Attempt to write data. This will buffer data beyond the pipe size limit in
784 // order for the clone stream to still work. This is allowed because the
785 // other input stream has drained its buffered segments and is ready for more
786 // data.
787 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
788 ASSERT_NS_SUCCEEDED(rv);
790 // Again, this should fail since the origin stream has not been read again.
791 // The pipe size should still restrict how far ahead we can buffer even
792 // when there is a cloned stream not being read.
793 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
794 ASSERT_NS_FAILED(rv);
796 cb = new testing::OutputStreamCallback();
797 rv = writer->AsyncWait(cb, 0, 0, nullptr);
798 ASSERT_NS_SUCCEEDED(rv);
800 // The write should again be blocked since we have written data and the
801 // main reader is at its maximum advance buffer.
802 ASSERT_FALSE(cb->Called());
804 // Close the original reader input stream. This was the fastest reader,
805 // so we should have a single stream that is buffered beyond our nominal
806 // limit.
807 reader->Close();
809 // Because the clone stream is still buffered the writable callback should
810 // not be fired.
811 ASSERT_FALSE(cb->Called());
813 // And we should not be able to perform a write.
814 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
815 ASSERT_NS_FAILED(rv);
817 // Create another clone stream. Now we have two streams that exceed our
818 // maximum size limit
819 nsCOMPtr<nsIInputStream> clone2;
820 rv = NS_CloneInputStream(clone, getter_AddRefs(clone2));
821 ASSERT_NS_SUCCEEDED(rv);
823 nsTArray<char> expectedCloneData;
824 expectedCloneData.AppendElements(inputData);
825 expectedCloneData.AppendElements(inputData);
827 // We should now be able to consume the entire backlog of buffered data on
828 // the cloned stream.
829 testing::ConsumeAndValidateStream(clone, expectedCloneData);
831 // The pipe should now be writable because we have two open streams, one of
832 // which is completely drained.
833 ASSERT_TRUE(cb->Called());
835 // Write again to reach our limit again.
836 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
837 ASSERT_NS_SUCCEEDED(rv);
839 // The stream is again non-writeable.
840 cb = new testing::OutputStreamCallback();
841 rv = writer->AsyncWait(cb, 0, 0, nullptr);
842 ASSERT_NS_SUCCEEDED(rv);
843 ASSERT_FALSE(cb->Called());
845 // Close the empty stream. This is different from our previous close since
846 // before we were closing a stream with some data still buffered.
847 clone->Close();
849 // The pipe should not be writable. The second clone is still fully buffered
850 // over our limit.
851 ASSERT_FALSE(cb->Called());
852 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
853 ASSERT_NS_FAILED(rv);
855 // Finally consume all of the buffered data on the second clone.
856 expectedCloneData.AppendElements(inputData);
857 testing::ConsumeAndValidateStream(clone2, expectedCloneData);
859 // Draining the final clone should make the pipe writable again.
860 ASSERT_TRUE(cb->Called());
863 TEST(Pipes, Read_AsyncWait)
865 nsCOMPtr<nsIAsyncInputStream> reader;
866 nsCOMPtr<nsIAsyncOutputStream> writer;
868 const uint32_t segmentSize = 1024;
869 const uint32_t numSegments = 1;
871 NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer), true,
872 true, // non-blocking - reader, writer
873 segmentSize, numSegments);
875 nsTArray<char> inputData;
876 testing::CreateData(segmentSize, inputData);
878 RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback();
880 nsresult rv = reader->AsyncWait(cb, 0, 0, nullptr);
881 ASSERT_NS_SUCCEEDED(rv);
883 ASSERT_FALSE(cb->Called());
885 uint32_t numWritten = 0;
886 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
887 ASSERT_NS_SUCCEEDED(rv);
889 ASSERT_TRUE(cb->Called());
891 testing::ConsumeAndValidateStream(reader, inputData);
894 TEST(Pipes, Read_AsyncWait_Clone)
896 nsCOMPtr<nsIAsyncInputStream> reader;
897 nsCOMPtr<nsIAsyncOutputStream> writer;
899 const uint32_t segmentSize = 1024;
900 const uint32_t numSegments = 1;
902 NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer), true,
903 true, // non-blocking - reader, writer
904 segmentSize, numSegments);
906 nsCOMPtr<nsIInputStream> clone;
907 nsresult rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
908 ASSERT_NS_SUCCEEDED(rv);
910 nsCOMPtr<nsIAsyncInputStream> asyncClone = do_QueryInterface(clone);
911 ASSERT_TRUE(asyncClone);
913 nsTArray<char> inputData;
914 testing::CreateData(segmentSize, inputData);
916 RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback();
918 RefPtr<testing::InputStreamCallback> cb2 = new testing::InputStreamCallback();
920 rv = reader->AsyncWait(cb, 0, 0, nullptr);
921 ASSERT_NS_SUCCEEDED(rv);
923 ASSERT_FALSE(cb->Called());
925 rv = asyncClone->AsyncWait(cb2, 0, 0, nullptr);
926 ASSERT_NS_SUCCEEDED(rv);
928 ASSERT_FALSE(cb2->Called());
930 uint32_t numWritten = 0;
931 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
932 ASSERT_NS_SUCCEEDED(rv);
934 ASSERT_TRUE(cb->Called());
935 ASSERT_TRUE(cb2->Called());
937 testing::ConsumeAndValidateStream(reader, inputData);
940 namespace {
942 nsresult CloseDuringReadFunc(nsIInputStream* aReader, void* aClosure,
943 const char* aFromSegment, uint32_t aToOffset,
944 uint32_t aCount, uint32_t* aWriteCountOut) {
945 MOZ_RELEASE_ASSERT(aReader);
946 MOZ_RELEASE_ASSERT(aClosure);
947 MOZ_RELEASE_ASSERT(aFromSegment);
948 MOZ_RELEASE_ASSERT(aWriteCountOut);
949 MOZ_RELEASE_ASSERT(aToOffset == 0);
951 // This is insanity and you probably should not do this under normal
952 // conditions. We want to simulate the case where the pipe is closed
953 // (possibly from other end on another thread) simultaneously with the
954 // read. This is the easiest way to do trigger this case in a synchronous
955 // gtest.
956 MOZ_ALWAYS_SUCCEEDS(aReader->Close());
958 nsTArray<char>* buffer = static_cast<nsTArray<char>*>(aClosure);
959 buffer->AppendElements(aFromSegment, aCount);
961 *aWriteCountOut = aCount;
963 return NS_OK;
966 void TestCloseDuringRead(uint32_t aSegmentSize, uint32_t aDataSize) {
967 nsCOMPtr<nsIInputStream> reader;
968 nsCOMPtr<nsIOutputStream> writer;
970 const uint32_t maxSize = aSegmentSize;
972 NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), aSegmentSize,
973 maxSize);
975 nsTArray<char> inputData;
977 testing::CreateData(aDataSize, inputData);
979 uint32_t numWritten = 0;
980 nsresult rv =
981 writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
982 ASSERT_NS_SUCCEEDED(rv);
984 nsTArray<char> outputData;
986 uint32_t numRead = 0;
987 rv = reader->ReadSegments(CloseDuringReadFunc, &outputData,
988 inputData.Length(), &numRead);
989 ASSERT_NS_SUCCEEDED(rv);
990 ASSERT_EQ(inputData.Length(), numRead);
992 ASSERT_EQ(inputData, outputData);
994 uint64_t available;
995 rv = reader->Available(&available);
996 ASSERT_EQ(NS_BASE_STREAM_CLOSED, rv);
999 } // namespace
1001 TEST(Pipes, Close_During_Read_Partial_Segment)
1002 { TestCloseDuringRead(1024, 512); }
1004 TEST(Pipes, Close_During_Read_Full_Segment)
1005 { TestCloseDuringRead(1024, 1024); }
1007 TEST(Pipes, Interfaces)
1009 nsCOMPtr<nsIInputStream> reader;
1010 nsCOMPtr<nsIOutputStream> writer;
1012 NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer));
1014 nsCOMPtr<nsIAsyncInputStream> readerType1 = do_QueryInterface(reader);
1015 ASSERT_TRUE(readerType1);
1017 nsCOMPtr<nsITellableStream> readerType2 = do_QueryInterface(reader);
1018 ASSERT_TRUE(readerType2);
1020 nsCOMPtr<nsISearchableInputStream> readerType3 = do_QueryInterface(reader);
1021 ASSERT_TRUE(readerType3);
1023 nsCOMPtr<nsICloneableInputStream> readerType4 = do_QueryInterface(reader);
1024 ASSERT_TRUE(readerType4);
1026 nsCOMPtr<nsIClassInfo> readerType5 = do_QueryInterface(reader);
1027 ASSERT_TRUE(readerType5);
1029 nsCOMPtr<nsIBufferedInputStream> readerType6 = do_QueryInterface(reader);
1030 ASSERT_TRUE(readerType6);