1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
8 #include "mozilla/Logging.h"
10 #include "mozilla/Maybe.h"
11 #include "mozilla/Mutex.h"
12 #include "mozilla/Attributes.h"
13 #include "mozilla/IntegerPrintfMacros.h"
14 #include "nsIInputStreamTee.h"
15 #include "nsIInputStream.h"
16 #include "nsIOutputStream.h"
18 #include "nsIEventTarget.h"
19 #include "nsThreadUtils.h"
21 using namespace mozilla
;
27 static LazyLogModule
sTeeLog("nsInputStreamTee");
28 #define LOG(args) MOZ_LOG(sTeeLog, mozilla::LogLevel::Debug, args)
30 class nsInputStreamTee final
: public nsIInputStreamTee
{
32 NS_DECL_THREADSAFE_ISUPPORTS
33 NS_DECL_NSIINPUTSTREAM
34 NS_DECL_NSIINPUTSTREAMTEE
38 void InvalidateSink();
41 ~nsInputStreamTee() = default;
43 nsresult
TeeSegment(const char* aBuf
, uint32_t aCount
);
45 static nsresult
WriteSegmentFun(nsIInputStream
*, void*, const char*, uint32_t,
49 nsCOMPtr
<nsIInputStream
> mSource
;
50 nsCOMPtr
<nsIOutputStream
> mSink
;
51 nsCOMPtr
<nsIEventTarget
> mEventTarget
;
52 nsWriteSegmentFun mWriter
; // for implementing ReadSegments
53 void* mClosure
; // for implementing ReadSegments
54 Maybe
<Mutex
> mLock
; // synchronize access to mSinkIsValid
55 bool mSinkIsValid
; // False if TeeWriteEvent fails
58 class nsInputStreamTeeWriteEvent
: public Runnable
{
60 // aTee's lock is held across construction of this object
61 nsInputStreamTeeWriteEvent(const char* aBuf
, uint32_t aCount
,
62 nsIOutputStream
* aSink
, nsInputStreamTee
* aTee
)
63 : mozilla::Runnable("nsInputStreamTeeWriteEvent") {
64 // copy the buffer - will be free'd by dtor
65 mBuf
= (char*)malloc(aCount
);
67 memcpy(mBuf
, (char*)aBuf
, aCount
);
72 mSink
->IsNonBlocking(&isNonBlocking
);
73 NS_ASSERTION(isNonBlocking
== false, "mSink is nonblocking");
77 NS_IMETHOD
Run() override
{
80 "nsInputStreamTeeWriteEvent::Run() "
81 "memory not allocated\n");
84 MOZ_ASSERT(mSink
, "mSink is null!");
86 // The output stream could have been invalidated between when
87 // this event was dispatched and now, so check before writing.
88 if (!mTee
->SinkIsValid()) {
93 ("nsInputStreamTeeWriteEvent::Run() [%p]"
94 "will write %u bytes to %p\n",
95 this, mCount
, mSink
.get()));
97 uint32_t totalBytesWritten
= 0;
100 uint32_t bytesWritten
= 0;
101 rv
= mSink
->Write(mBuf
+ totalBytesWritten
, mCount
, &bytesWritten
);
103 LOG(("nsInputStreamTeeWriteEvent::Run[%p] error %" PRIx32
" in writing",
104 this, static_cast<uint32_t>(rv
)));
105 mTee
->InvalidateSink();
108 totalBytesWritten
+= bytesWritten
;
109 NS_ASSERTION(bytesWritten
<= mCount
, "wrote too much");
110 mCount
-= bytesWritten
;
116 virtual ~nsInputStreamTeeWriteEvent() {
126 nsCOMPtr
<nsIOutputStream
> mSink
;
127 // back pointer to the tee that created this runnable
128 RefPtr
<nsInputStreamTee
> mTee
;
131 nsInputStreamTee::nsInputStreamTee()
132 : mWriter(nullptr), mClosure(nullptr), mSinkIsValid(true) {}
134 bool nsInputStreamTee::SinkIsValid() {
135 MutexAutoLock
lock(*mLock
);
139 void nsInputStreamTee::InvalidateSink() {
140 MutexAutoLock
lock(*mLock
);
141 mSinkIsValid
= false;
144 nsresult
nsInputStreamTee::TeeSegment(const char* aBuf
, uint32_t aCount
) {
146 return NS_OK
; // nothing to do
148 if (mLock
) { // asynchronous case
149 NS_ASSERTION(mEventTarget
, "mEventTarget is null, mLock is not null.");
150 if (!SinkIsValid()) {
151 return NS_OK
; // nothing to do
153 nsCOMPtr
<nsIRunnable
> event
=
154 new nsInputStreamTeeWriteEvent(aBuf
, aCount
, mSink
, this);
155 LOG(("nsInputStreamTee::TeeSegment [%p] dispatching write %u bytes\n", this,
157 return mEventTarget
->Dispatch(event
, NS_DISPATCH_NORMAL
);
158 } else { // synchronous case
159 NS_ASSERTION(!mEventTarget
, "mEventTarget is not null, mLock is null.");
161 uint32_t totalBytesWritten
= 0;
163 uint32_t bytesWritten
= 0;
164 rv
= mSink
->Write(aBuf
+ totalBytesWritten
, aCount
, &bytesWritten
);
166 // ok, this is not a fatal error... just drop our reference to mSink
167 // and continue on as if nothing happened.
168 NS_WARNING("Write failed (non-fatal)");
169 // catch possible misuse of the input stream tee
170 NS_ASSERTION(rv
!= NS_BASE_STREAM_WOULD_BLOCK
,
171 "sink must be a blocking stream");
175 totalBytesWritten
+= bytesWritten
;
176 NS_ASSERTION(bytesWritten
<= aCount
, "wrote too much");
177 aCount
-= bytesWritten
;
183 nsresult
nsInputStreamTee::WriteSegmentFun(nsIInputStream
* aIn
, void* aClosure
,
184 const char* aFromSegment
,
185 uint32_t aOffset
, uint32_t aCount
,
186 uint32_t* aWriteCount
) {
187 nsInputStreamTee
* tee
= reinterpret_cast<nsInputStreamTee
*>(aClosure
);
188 nsresult rv
= tee
->mWriter(aIn
, tee
->mClosure
, aFromSegment
, aOffset
, aCount
,
190 if (NS_FAILED(rv
) || (*aWriteCount
== 0)) {
191 NS_ASSERTION((NS_FAILED(rv
) ? (*aWriteCount
== 0) : true),
192 "writer returned an error with non-zero writeCount");
196 return tee
->TeeSegment(aFromSegment
, *aWriteCount
);
199 NS_IMPL_ISUPPORTS(nsInputStreamTee
, nsIInputStreamTee
, nsIInputStream
)
201 nsInputStreamTee::Close() {
202 if (NS_WARN_IF(!mSource
)) {
203 return NS_ERROR_NOT_INITIALIZED
;
205 nsresult rv
= mSource
->Close();
212 nsInputStreamTee::Available(uint64_t* aAvail
) {
213 if (NS_WARN_IF(!mSource
)) {
214 return NS_ERROR_NOT_INITIALIZED
;
216 return mSource
->Available(aAvail
);
220 nsInputStreamTee::StreamStatus() {
221 if (NS_WARN_IF(!mSource
)) {
222 return NS_ERROR_NOT_INITIALIZED
;
224 return mSource
->StreamStatus();
228 nsInputStreamTee::Read(char* aBuf
, uint32_t aCount
, uint32_t* aBytesRead
) {
229 if (NS_WARN_IF(!mSource
)) {
230 return NS_ERROR_NOT_INITIALIZED
;
233 nsresult rv
= mSource
->Read(aBuf
, aCount
, aBytesRead
);
234 if (NS_FAILED(rv
) || (*aBytesRead
== 0)) {
238 return TeeSegment(aBuf
, *aBytesRead
);
242 nsInputStreamTee::ReadSegments(nsWriteSegmentFun aWriter
, void* aClosure
,
243 uint32_t aCount
, uint32_t* aBytesRead
) {
244 if (NS_WARN_IF(!mSource
)) {
245 return NS_ERROR_NOT_INITIALIZED
;
251 return mSource
->ReadSegments(WriteSegmentFun
, this, aCount
, aBytesRead
);
255 nsInputStreamTee::IsNonBlocking(bool* aResult
) {
256 if (NS_WARN_IF(!mSource
)) {
257 return NS_ERROR_NOT_INITIALIZED
;
259 return mSource
->IsNonBlocking(aResult
);
263 nsInputStreamTee::SetSource(nsIInputStream
* aSource
) {
269 nsInputStreamTee::GetSource(nsIInputStream
** aSource
) {
270 NS_IF_ADDREF(*aSource
= mSource
);
275 nsInputStreamTee::SetSink(nsIOutputStream
* aSink
) {
279 nsresult rv
= aSink
->IsNonBlocking(&nonBlocking
);
280 if (NS_FAILED(rv
) || nonBlocking
) {
281 NS_ERROR("aSink should be a blocking stream");
290 nsInputStreamTee::GetSink(nsIOutputStream
** aSink
) {
291 NS_IF_ADDREF(*aSink
= mSink
);
296 nsInputStreamTee::SetEventTarget(nsIEventTarget
* aEventTarget
) {
297 mEventTarget
= aEventTarget
;
299 // Only need synchronization if this is an async tee
300 mLock
.emplace("nsInputStreamTee.mLock");
306 nsInputStreamTee::GetEventTarget(nsIEventTarget
** aEventTarget
) {
307 NS_IF_ADDREF(*aEventTarget
= mEventTarget
);
311 nsresult
NS_NewInputStreamTeeAsync(nsIInputStream
** aResult
,
312 nsIInputStream
* aSource
,
313 nsIOutputStream
* aSink
,
314 nsIEventTarget
* aEventTarget
) {
317 nsCOMPtr
<nsIInputStreamTee
> tee
= new nsInputStreamTee();
318 rv
= tee
->SetSource(aSource
);
323 rv
= tee
->SetSink(aSink
);
328 rv
= tee
->SetEventTarget(aEventTarget
);
337 nsresult
NS_NewInputStreamTee(nsIInputStream
** aResult
, nsIInputStream
* aSource
,
338 nsIOutputStream
* aSink
) {
339 return NS_NewInputStreamTeeAsync(aResult
, aSource
, aSink
, nullptr);