1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
10 #include "mozilla/Mutex.h"
11 #include "mozilla/Attributes.h"
12 #include "nsIInputStreamTee.h"
13 #include "nsIInputStream.h"
14 #include "nsIOutputStream.h"
16 #include "nsAutoPtr.h"
17 #include "nsIEventTarget.h"
18 #include "nsThreadUtils.h"
20 using namespace mozilla
;
26 static PRLogModuleInfo
*
29 static PRLogModuleInfo
* sLog
;
31 sLog
= PR_NewLogModule("nsInputStreamTee");
35 #define LOG(args) PR_LOG(GetTeeLog(), PR_LOG_DEBUG, args)
40 class nsInputStreamTee MOZ_FINAL
: public nsIInputStreamTee
43 NS_DECL_THREADSAFE_ISUPPORTS
44 NS_DECL_NSIINPUTSTREAM
45 NS_DECL_NSIINPUTSTREAMTEE
49 void InvalidateSink();
56 nsresult
TeeSegment(const char* aBuf
, uint32_t aCount
);
58 static NS_METHOD
WriteSegmentFun(nsIInputStream
*, void*, const char*,
59 uint32_t, uint32_t, uint32_t*);
62 nsCOMPtr
<nsIInputStream
> mSource
;
63 nsCOMPtr
<nsIOutputStream
> mSink
;
64 nsCOMPtr
<nsIEventTarget
> mEventTarget
;
65 nsWriteSegmentFun mWriter
; // for implementing ReadSegments
66 void* mClosure
; // for implementing ReadSegments
67 nsAutoPtr
<Mutex
> mLock
; // synchronize access to mSinkIsValid
68 bool mSinkIsValid
; // False if TeeWriteEvent fails
71 class nsInputStreamTeeWriteEvent
: public nsRunnable
74 // aTee's lock is held across construction of this object
75 nsInputStreamTeeWriteEvent(const char* aBuf
, uint32_t aCount
,
76 nsIOutputStream
* aSink
, nsInputStreamTee
* aTee
)
78 // copy the buffer - will be free'd by dtor
79 mBuf
= (char*)malloc(aCount
);
81 memcpy(mBuf
, (char*)aBuf
, aCount
);
86 mSink
->IsNonBlocking(&isNonBlocking
);
87 NS_ASSERTION(isNonBlocking
== false, "mSink is nonblocking");
94 NS_WARNING("nsInputStreamTeeWriteEvent::Run() "
95 "memory not allocated\n");
98 NS_ABORT_IF_FALSE(mSink
, "mSink is null!");
100 // The output stream could have been invalidated between when
101 // this event was dispatched and now, so check before writing.
102 if (!mTee
->SinkIsValid()) {
106 LOG(("nsInputStreamTeeWriteEvent::Run() [%p]"
107 "will write %u bytes to %p\n",
108 this, mCount
, mSink
.get()));
110 uint32_t totalBytesWritten
= 0;
113 uint32_t bytesWritten
= 0;
114 rv
= mSink
->Write(mBuf
+ totalBytesWritten
, mCount
, &bytesWritten
);
116 LOG(("nsInputStreamTeeWriteEvent::Run[%p] error %x in writing",
118 mTee
->InvalidateSink();
121 totalBytesWritten
+= bytesWritten
;
122 NS_ASSERTION(bytesWritten
<= mCount
, "wrote too much");
123 mCount
-= bytesWritten
;
129 virtual ~nsInputStreamTeeWriteEvent()
140 nsCOMPtr
<nsIOutputStream
> mSink
;
141 // back pointer to the tee that created this runnable
142 nsRefPtr
<nsInputStreamTee
> mTee
;
145 nsInputStreamTee::nsInputStreamTee(): mLock(nullptr)
151 nsInputStreamTee::SinkIsValid()
153 MutexAutoLock
lock(*mLock
);
158 nsInputStreamTee::InvalidateSink()
160 MutexAutoLock
lock(*mLock
);
161 mSinkIsValid
= false;
165 nsInputStreamTee::TeeSegment(const char* aBuf
, uint32_t aCount
)
168 return NS_OK
; // nothing to do
170 if (mLock
) { // asynchronous case
171 NS_ASSERTION(mEventTarget
, "mEventTarget is null, mLock is not null.");
172 if (!SinkIsValid()) {
173 return NS_OK
; // nothing to do
175 nsRefPtr
<nsIRunnable
> event
=
176 new nsInputStreamTeeWriteEvent(aBuf
, aCount
, mSink
, this);
177 LOG(("nsInputStreamTee::TeeSegment [%p] dispatching write %u bytes\n",
179 return mEventTarget
->Dispatch(event
, NS_DISPATCH_NORMAL
);
180 } else { // synchronous case
181 NS_ASSERTION(!mEventTarget
, "mEventTarget is not null, mLock is null.");
183 uint32_t totalBytesWritten
= 0;
185 uint32_t bytesWritten
= 0;
186 rv
= mSink
->Write(aBuf
+ totalBytesWritten
, aCount
, &bytesWritten
);
188 // ok, this is not a fatal error... just drop our reference to mSink
189 // and continue on as if nothing happened.
190 NS_WARNING("Write failed (non-fatal)");
191 // catch possible misuse of the input stream tee
192 NS_ASSERTION(rv
!= NS_BASE_STREAM_WOULD_BLOCK
, "sink must be a blocking stream");
196 totalBytesWritten
+= bytesWritten
;
197 NS_ASSERTION(bytesWritten
<= aCount
, "wrote too much");
198 aCount
-= bytesWritten
;
205 nsInputStreamTee::WriteSegmentFun(nsIInputStream
* aIn
, void* aClosure
,
206 const char* aFromSegment
, uint32_t aOffset
,
207 uint32_t aCount
, uint32_t* aWriteCount
)
209 nsInputStreamTee
* tee
= reinterpret_cast<nsInputStreamTee
*>(aClosure
);
210 nsresult rv
= tee
->mWriter(aIn
, tee
->mClosure
, aFromSegment
, aOffset
,
211 aCount
, aWriteCount
);
212 if (NS_FAILED(rv
) || (*aWriteCount
== 0)) {
213 NS_ASSERTION((NS_FAILED(rv
) ? (*aWriteCount
== 0) : true),
214 "writer returned an error with non-zero writeCount");
218 return tee
->TeeSegment(aFromSegment
, *aWriteCount
);
221 NS_IMPL_ISUPPORTS(nsInputStreamTee
,
225 nsInputStreamTee::Close()
227 if (NS_WARN_IF(!mSource
)) {
228 return NS_ERROR_NOT_INITIALIZED
;
230 nsresult rv
= mSource
->Close();
237 nsInputStreamTee::Available(uint64_t* aAvail
)
239 if (NS_WARN_IF(!mSource
)) {
240 return NS_ERROR_NOT_INITIALIZED
;
242 return mSource
->Available(aAvail
);
246 nsInputStreamTee::Read(char* aBuf
, uint32_t aCount
, uint32_t* aBytesRead
)
248 if (NS_WARN_IF(!mSource
)) {
249 return NS_ERROR_NOT_INITIALIZED
;
252 nsresult rv
= mSource
->Read(aBuf
, aCount
, aBytesRead
);
253 if (NS_FAILED(rv
) || (*aBytesRead
== 0)) {
257 return TeeSegment(aBuf
, *aBytesRead
);
261 nsInputStreamTee::ReadSegments(nsWriteSegmentFun aWriter
,
264 uint32_t* aBytesRead
)
266 if (NS_WARN_IF(!mSource
)) {
267 return NS_ERROR_NOT_INITIALIZED
;
273 return mSource
->ReadSegments(WriteSegmentFun
, this, aCount
, aBytesRead
);
277 nsInputStreamTee::IsNonBlocking(bool* aResult
)
279 if (NS_WARN_IF(!mSource
)) {
280 return NS_ERROR_NOT_INITIALIZED
;
282 return mSource
->IsNonBlocking(aResult
);
286 nsInputStreamTee::SetSource(nsIInputStream
* aSource
)
293 nsInputStreamTee::GetSource(nsIInputStream
** aSource
)
295 NS_IF_ADDREF(*aSource
= mSource
);
300 nsInputStreamTee::SetSink(nsIOutputStream
* aSink
)
305 nsresult rv
= aSink
->IsNonBlocking(&nonBlocking
);
306 if (NS_FAILED(rv
) || nonBlocking
) {
307 NS_ERROR("aSink should be a blocking stream");
316 nsInputStreamTee::GetSink(nsIOutputStream
** aSink
)
318 NS_IF_ADDREF(*aSink
= mSink
);
323 nsInputStreamTee::SetEventTarget(nsIEventTarget
* aEventTarget
)
325 mEventTarget
= aEventTarget
;
327 // Only need synchronization if this is an async tee
328 mLock
= new Mutex("nsInputStreamTee.mLock");
334 nsInputStreamTee::GetEventTarget(nsIEventTarget
** aEventTarget
)
336 NS_IF_ADDREF(*aEventTarget
= mEventTarget
);
342 NS_NewInputStreamTeeAsync(nsIInputStream
** aResult
,
343 nsIInputStream
* aSource
,
344 nsIOutputStream
* aSink
,
345 nsIEventTarget
* aEventTarget
)
349 nsCOMPtr
<nsIInputStreamTee
> tee
= new nsInputStreamTee();
351 return NS_ERROR_OUT_OF_MEMORY
;
354 rv
= tee
->SetSource(aSource
);
359 rv
= tee
->SetSink(aSink
);
364 rv
= tee
->SetEventTarget(aEventTarget
);
369 NS_ADDREF(*aResult
= tee
);
374 NS_NewInputStreamTee(nsIInputStream
** aResult
,
375 nsIInputStream
* aSource
,
376 nsIOutputStream
* aSink
)
378 return NS_NewInputStreamTeeAsync(aResult
, aSource
, aSink
, nullptr);