1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
8 #include "mozilla/Logging.h"
10 #include "mozilla/Maybe.h"
11 #include "mozilla/Mutex.h"
12 #include "mozilla/Attributes.h"
13 #include "nsIInputStreamTee.h"
14 #include "nsIInputStream.h"
15 #include "nsIOutputStream.h"
17 #include "nsIEventTarget.h"
18 #include "nsThreadUtils.h"
20 using namespace mozilla
;
26 static LazyLogModule
sTeeLog("nsInputStreamTee");
27 #define LOG(args) MOZ_LOG(sTeeLog, mozilla::LogLevel::Debug, args)
29 class nsInputStreamTee final
: public nsIInputStreamTee
{
31 NS_DECL_THREADSAFE_ISUPPORTS
32 NS_DECL_NSIINPUTSTREAM
33 NS_DECL_NSIINPUTSTREAMTEE
37 void InvalidateSink();
40 ~nsInputStreamTee() = default;
42 nsresult
TeeSegment(const char* aBuf
, uint32_t aCount
);
44 static nsresult
WriteSegmentFun(nsIInputStream
*, void*, const char*, uint32_t,
48 nsCOMPtr
<nsIInputStream
> mSource
;
49 nsCOMPtr
<nsIOutputStream
> mSink
;
50 nsCOMPtr
<nsIEventTarget
> mEventTarget
;
51 nsWriteSegmentFun mWriter
; // for implementing ReadSegments
52 void* mClosure
; // for implementing ReadSegments
53 Maybe
<Mutex
> mLock
; // synchronize access to mSinkIsValid
54 bool mSinkIsValid
; // False if TeeWriteEvent fails
57 class nsInputStreamTeeWriteEvent
: public Runnable
{
59 // aTee's lock is held across construction of this object
60 nsInputStreamTeeWriteEvent(const char* aBuf
, uint32_t aCount
,
61 nsIOutputStream
* aSink
, nsInputStreamTee
* aTee
)
62 : mozilla::Runnable("nsInputStreamTeeWriteEvent") {
63 // copy the buffer - will be free'd by dtor
64 mBuf
= (char*)malloc(aCount
);
66 memcpy(mBuf
, (char*)aBuf
, aCount
);
71 mSink
->IsNonBlocking(&isNonBlocking
);
72 NS_ASSERTION(isNonBlocking
== false, "mSink is nonblocking");
76 NS_IMETHOD
Run() override
{
79 "nsInputStreamTeeWriteEvent::Run() "
80 "memory not allocated\n");
83 MOZ_ASSERT(mSink
, "mSink is null!");
85 // The output stream could have been invalidated between when
86 // this event was dispatched and now, so check before writing.
87 if (!mTee
->SinkIsValid()) {
92 ("nsInputStreamTeeWriteEvent::Run() [%p]"
93 "will write %u bytes to %p\n",
94 this, mCount
, mSink
.get()));
96 uint32_t totalBytesWritten
= 0;
99 uint32_t bytesWritten
= 0;
100 rv
= mSink
->Write(mBuf
+ totalBytesWritten
, mCount
, &bytesWritten
);
102 LOG(("nsInputStreamTeeWriteEvent::Run[%p] error %" PRIx32
" in writing",
103 this, static_cast<uint32_t>(rv
)));
104 mTee
->InvalidateSink();
107 totalBytesWritten
+= bytesWritten
;
108 NS_ASSERTION(bytesWritten
<= mCount
, "wrote too much");
109 mCount
-= bytesWritten
;
115 virtual ~nsInputStreamTeeWriteEvent() {
125 nsCOMPtr
<nsIOutputStream
> mSink
;
126 // back pointer to the tee that created this runnable
127 RefPtr
<nsInputStreamTee
> mTee
;
130 nsInputStreamTee::nsInputStreamTee()
131 : mWriter(nullptr), mClosure(nullptr), mSinkIsValid(true) {}
133 bool nsInputStreamTee::SinkIsValid() {
134 MutexAutoLock
lock(*mLock
);
138 void nsInputStreamTee::InvalidateSink() {
139 MutexAutoLock
lock(*mLock
);
140 mSinkIsValid
= false;
143 nsresult
nsInputStreamTee::TeeSegment(const char* aBuf
, uint32_t aCount
) {
145 return NS_OK
; // nothing to do
147 if (mLock
) { // asynchronous case
148 NS_ASSERTION(mEventTarget
, "mEventTarget is null, mLock is not null.");
149 if (!SinkIsValid()) {
150 return NS_OK
; // nothing to do
152 nsCOMPtr
<nsIRunnable
> event
=
153 new nsInputStreamTeeWriteEvent(aBuf
, aCount
, mSink
, this);
154 LOG(("nsInputStreamTee::TeeSegment [%p] dispatching write %u bytes\n", this,
156 return mEventTarget
->Dispatch(event
, NS_DISPATCH_NORMAL
);
157 } else { // synchronous case
158 NS_ASSERTION(!mEventTarget
, "mEventTarget is not null, mLock is null.");
160 uint32_t totalBytesWritten
= 0;
162 uint32_t bytesWritten
= 0;
163 rv
= mSink
->Write(aBuf
+ totalBytesWritten
, aCount
, &bytesWritten
);
165 // ok, this is not a fatal error... just drop our reference to mSink
166 // and continue on as if nothing happened.
167 NS_WARNING("Write failed (non-fatal)");
168 // catch possible misuse of the input stream tee
169 NS_ASSERTION(rv
!= NS_BASE_STREAM_WOULD_BLOCK
,
170 "sink must be a blocking stream");
174 totalBytesWritten
+= bytesWritten
;
175 NS_ASSERTION(bytesWritten
<= aCount
, "wrote too much");
176 aCount
-= bytesWritten
;
182 nsresult
nsInputStreamTee::WriteSegmentFun(nsIInputStream
* aIn
, void* aClosure
,
183 const char* aFromSegment
,
184 uint32_t aOffset
, uint32_t aCount
,
185 uint32_t* aWriteCount
) {
186 nsInputStreamTee
* tee
= reinterpret_cast<nsInputStreamTee
*>(aClosure
);
187 nsresult rv
= tee
->mWriter(aIn
, tee
->mClosure
, aFromSegment
, aOffset
, aCount
,
189 if (NS_FAILED(rv
) || (*aWriteCount
== 0)) {
190 NS_ASSERTION((NS_FAILED(rv
) ? (*aWriteCount
== 0) : true),
191 "writer returned an error with non-zero writeCount");
195 return tee
->TeeSegment(aFromSegment
, *aWriteCount
);
198 NS_IMPL_ISUPPORTS(nsInputStreamTee
, nsIInputStreamTee
, nsIInputStream
)
200 nsInputStreamTee::Close() {
201 if (NS_WARN_IF(!mSource
)) {
202 return NS_ERROR_NOT_INITIALIZED
;
204 nsresult rv
= mSource
->Close();
211 nsInputStreamTee::Available(uint64_t* aAvail
) {
212 if (NS_WARN_IF(!mSource
)) {
213 return NS_ERROR_NOT_INITIALIZED
;
215 return mSource
->Available(aAvail
);
219 nsInputStreamTee::StreamStatus() {
220 if (NS_WARN_IF(!mSource
)) {
221 return NS_ERROR_NOT_INITIALIZED
;
223 return mSource
->StreamStatus();
227 nsInputStreamTee::Read(char* aBuf
, uint32_t aCount
, uint32_t* aBytesRead
) {
228 if (NS_WARN_IF(!mSource
)) {
229 return NS_ERROR_NOT_INITIALIZED
;
232 nsresult rv
= mSource
->Read(aBuf
, aCount
, aBytesRead
);
233 if (NS_FAILED(rv
) || (*aBytesRead
== 0)) {
237 return TeeSegment(aBuf
, *aBytesRead
);
241 nsInputStreamTee::ReadSegments(nsWriteSegmentFun aWriter
, void* aClosure
,
242 uint32_t aCount
, uint32_t* aBytesRead
) {
243 if (NS_WARN_IF(!mSource
)) {
244 return NS_ERROR_NOT_INITIALIZED
;
250 return mSource
->ReadSegments(WriteSegmentFun
, this, aCount
, aBytesRead
);
254 nsInputStreamTee::IsNonBlocking(bool* aResult
) {
255 if (NS_WARN_IF(!mSource
)) {
256 return NS_ERROR_NOT_INITIALIZED
;
258 return mSource
->IsNonBlocking(aResult
);
262 nsInputStreamTee::SetSource(nsIInputStream
* aSource
) {
268 nsInputStreamTee::GetSource(nsIInputStream
** aSource
) {
269 NS_IF_ADDREF(*aSource
= mSource
);
274 nsInputStreamTee::SetSink(nsIOutputStream
* aSink
) {
278 nsresult rv
= aSink
->IsNonBlocking(&nonBlocking
);
279 if (NS_FAILED(rv
) || nonBlocking
) {
280 NS_ERROR("aSink should be a blocking stream");
289 nsInputStreamTee::GetSink(nsIOutputStream
** aSink
) {
290 NS_IF_ADDREF(*aSink
= mSink
);
295 nsInputStreamTee::SetEventTarget(nsIEventTarget
* aEventTarget
) {
296 mEventTarget
= aEventTarget
;
298 // Only need synchronization if this is an async tee
299 mLock
.emplace("nsInputStreamTee.mLock");
305 nsInputStreamTee::GetEventTarget(nsIEventTarget
** aEventTarget
) {
306 NS_IF_ADDREF(*aEventTarget
= mEventTarget
);
310 nsresult
NS_NewInputStreamTeeAsync(nsIInputStream
** aResult
,
311 nsIInputStream
* aSource
,
312 nsIOutputStream
* aSink
,
313 nsIEventTarget
* aEventTarget
) {
316 nsCOMPtr
<nsIInputStreamTee
> tee
= new nsInputStreamTee();
317 rv
= tee
->SetSource(aSource
);
322 rv
= tee
->SetSink(aSink
);
327 rv
= tee
->SetEventTarget(aEventTarget
);
336 nsresult
NS_NewInputStreamTee(nsIInputStream
** aResult
, nsIInputStream
* aSource
,
337 nsIOutputStream
* aSink
) {
338 return NS_NewInputStreamTeeAsync(aResult
, aSource
, aSink
, nullptr);