Bumping manifests a=b2g-bump
[gecko.git] / xpcom / io / nsInputStreamTee.cpp
blob3ac142985e510f5f8b2db6024ea880d9c42b0a84
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include <stdlib.h>
8 #include "prlog.h"
10 #include "mozilla/Mutex.h"
11 #include "mozilla/Attributes.h"
12 #include "nsIInputStreamTee.h"
13 #include "nsIInputStream.h"
14 #include "nsIOutputStream.h"
15 #include "nsCOMPtr.h"
16 #include "nsAutoPtr.h"
17 #include "nsIEventTarget.h"
18 #include "nsThreadUtils.h"
20 using namespace mozilla;
22 #ifdef LOG
23 #undef LOG
24 #endif
25 #ifdef PR_LOGGING
26 static PRLogModuleInfo*
27 GetTeeLog()
29 static PRLogModuleInfo* sLog;
30 if (!sLog) {
31 sLog = PR_NewLogModule("nsInputStreamTee");
33 return sLog;
35 #define LOG(args) PR_LOG(GetTeeLog(), PR_LOG_DEBUG, args)
36 #else
37 #define LOG(args)
38 #endif
40 class nsInputStreamTee MOZ_FINAL : public nsIInputStreamTee
42 public:
43 NS_DECL_THREADSAFE_ISUPPORTS
44 NS_DECL_NSIINPUTSTREAM
45 NS_DECL_NSIINPUTSTREAMTEE
47 nsInputStreamTee();
48 bool SinkIsValid();
49 void InvalidateSink();
51 private:
52 ~nsInputStreamTee()
56 nsresult TeeSegment(const char* aBuf, uint32_t aCount);
58 static NS_METHOD WriteSegmentFun(nsIInputStream*, void*, const char*,
59 uint32_t, uint32_t, uint32_t*);
61 private:
62 nsCOMPtr<nsIInputStream> mSource;
63 nsCOMPtr<nsIOutputStream> mSink;
64 nsCOMPtr<nsIEventTarget> mEventTarget;
65 nsWriteSegmentFun mWriter; // for implementing ReadSegments
66 void* mClosure; // for implementing ReadSegments
67 nsAutoPtr<Mutex> mLock; // synchronize access to mSinkIsValid
68 bool mSinkIsValid; // False if TeeWriteEvent fails
71 class nsInputStreamTeeWriteEvent : public nsRunnable
73 public:
74 // aTee's lock is held across construction of this object
75 nsInputStreamTeeWriteEvent(const char* aBuf, uint32_t aCount,
76 nsIOutputStream* aSink, nsInputStreamTee* aTee)
78 // copy the buffer - will be free'd by dtor
79 mBuf = (char*)malloc(aCount);
80 if (mBuf) {
81 memcpy(mBuf, (char*)aBuf, aCount);
83 mCount = aCount;
84 mSink = aSink;
85 bool isNonBlocking;
86 mSink->IsNonBlocking(&isNonBlocking);
87 NS_ASSERTION(isNonBlocking == false, "mSink is nonblocking");
88 mTee = aTee;
91 NS_IMETHOD Run()
93 if (!mBuf) {
94 NS_WARNING("nsInputStreamTeeWriteEvent::Run() "
95 "memory not allocated\n");
96 return NS_OK;
98 NS_ABORT_IF_FALSE(mSink, "mSink is null!");
100 // The output stream could have been invalidated between when
101 // this event was dispatched and now, so check before writing.
102 if (!mTee->SinkIsValid()) {
103 return NS_OK;
106 LOG(("nsInputStreamTeeWriteEvent::Run() [%p]"
107 "will write %u bytes to %p\n",
108 this, mCount, mSink.get()));
110 uint32_t totalBytesWritten = 0;
111 while (mCount) {
112 nsresult rv;
113 uint32_t bytesWritten = 0;
114 rv = mSink->Write(mBuf + totalBytesWritten, mCount, &bytesWritten);
115 if (NS_FAILED(rv)) {
116 LOG(("nsInputStreamTeeWriteEvent::Run[%p] error %x in writing",
117 this, rv));
118 mTee->InvalidateSink();
119 break;
121 totalBytesWritten += bytesWritten;
122 NS_ASSERTION(bytesWritten <= mCount, "wrote too much");
123 mCount -= bytesWritten;
125 return NS_OK;
128 protected:
129 virtual ~nsInputStreamTeeWriteEvent()
131 if (mBuf) {
132 free(mBuf);
134 mBuf = nullptr;
137 private:
138 char* mBuf;
139 uint32_t mCount;
140 nsCOMPtr<nsIOutputStream> mSink;
141 // back pointer to the tee that created this runnable
142 nsRefPtr<nsInputStreamTee> mTee;
145 nsInputStreamTee::nsInputStreamTee(): mLock(nullptr)
146 , mSinkIsValid(true)
150 bool
151 nsInputStreamTee::SinkIsValid()
153 MutexAutoLock lock(*mLock);
154 return mSinkIsValid;
157 void
158 nsInputStreamTee::InvalidateSink()
160 MutexAutoLock lock(*mLock);
161 mSinkIsValid = false;
164 nsresult
165 nsInputStreamTee::TeeSegment(const char* aBuf, uint32_t aCount)
167 if (!mSink) {
168 return NS_OK; // nothing to do
170 if (mLock) { // asynchronous case
171 NS_ASSERTION(mEventTarget, "mEventTarget is null, mLock is not null.");
172 if (!SinkIsValid()) {
173 return NS_OK; // nothing to do
175 nsRefPtr<nsIRunnable> event =
176 new nsInputStreamTeeWriteEvent(aBuf, aCount, mSink, this);
177 LOG(("nsInputStreamTee::TeeSegment [%p] dispatching write %u bytes\n",
178 this, aCount));
179 return mEventTarget->Dispatch(event, NS_DISPATCH_NORMAL);
180 } else { // synchronous case
181 NS_ASSERTION(!mEventTarget, "mEventTarget is not null, mLock is null.");
182 nsresult rv;
183 uint32_t totalBytesWritten = 0;
184 while (aCount) {
185 uint32_t bytesWritten = 0;
186 rv = mSink->Write(aBuf + totalBytesWritten, aCount, &bytesWritten);
187 if (NS_FAILED(rv)) {
188 // ok, this is not a fatal error... just drop our reference to mSink
189 // and continue on as if nothing happened.
190 NS_WARNING("Write failed (non-fatal)");
191 // catch possible misuse of the input stream tee
192 NS_ASSERTION(rv != NS_BASE_STREAM_WOULD_BLOCK, "sink must be a blocking stream");
193 mSink = 0;
194 break;
196 totalBytesWritten += bytesWritten;
197 NS_ASSERTION(bytesWritten <= aCount, "wrote too much");
198 aCount -= bytesWritten;
200 return NS_OK;
204 NS_METHOD
205 nsInputStreamTee::WriteSegmentFun(nsIInputStream* aIn, void* aClosure,
206 const char* aFromSegment, uint32_t aOffset,
207 uint32_t aCount, uint32_t* aWriteCount)
209 nsInputStreamTee* tee = reinterpret_cast<nsInputStreamTee*>(aClosure);
210 nsresult rv = tee->mWriter(aIn, tee->mClosure, aFromSegment, aOffset,
211 aCount, aWriteCount);
212 if (NS_FAILED(rv) || (*aWriteCount == 0)) {
213 NS_ASSERTION((NS_FAILED(rv) ? (*aWriteCount == 0) : true),
214 "writer returned an error with non-zero writeCount");
215 return rv;
218 return tee->TeeSegment(aFromSegment, *aWriteCount);
221 NS_IMPL_ISUPPORTS(nsInputStreamTee,
222 nsIInputStreamTee,
223 nsIInputStream)
224 NS_IMETHODIMP
225 nsInputStreamTee::Close()
227 if (NS_WARN_IF(!mSource)) {
228 return NS_ERROR_NOT_INITIALIZED;
230 nsresult rv = mSource->Close();
231 mSource = 0;
232 mSink = 0;
233 return rv;
236 NS_IMETHODIMP
237 nsInputStreamTee::Available(uint64_t* aAvail)
239 if (NS_WARN_IF(!mSource)) {
240 return NS_ERROR_NOT_INITIALIZED;
242 return mSource->Available(aAvail);
245 NS_IMETHODIMP
246 nsInputStreamTee::Read(char* aBuf, uint32_t aCount, uint32_t* aBytesRead)
248 if (NS_WARN_IF(!mSource)) {
249 return NS_ERROR_NOT_INITIALIZED;
252 nsresult rv = mSource->Read(aBuf, aCount, aBytesRead);
253 if (NS_FAILED(rv) || (*aBytesRead == 0)) {
254 return rv;
257 return TeeSegment(aBuf, *aBytesRead);
260 NS_IMETHODIMP
261 nsInputStreamTee::ReadSegments(nsWriteSegmentFun aWriter,
262 void* aClosure,
263 uint32_t aCount,
264 uint32_t* aBytesRead)
266 if (NS_WARN_IF(!mSource)) {
267 return NS_ERROR_NOT_INITIALIZED;
270 mWriter = aWriter;
271 mClosure = aClosure;
273 return mSource->ReadSegments(WriteSegmentFun, this, aCount, aBytesRead);
276 NS_IMETHODIMP
277 nsInputStreamTee::IsNonBlocking(bool* aResult)
279 if (NS_WARN_IF(!mSource)) {
280 return NS_ERROR_NOT_INITIALIZED;
282 return mSource->IsNonBlocking(aResult);
285 NS_IMETHODIMP
286 nsInputStreamTee::SetSource(nsIInputStream* aSource)
288 mSource = aSource;
289 return NS_OK;
292 NS_IMETHODIMP
293 nsInputStreamTee::GetSource(nsIInputStream** aSource)
295 NS_IF_ADDREF(*aSource = mSource);
296 return NS_OK;
299 NS_IMETHODIMP
300 nsInputStreamTee::SetSink(nsIOutputStream* aSink)
302 #ifdef DEBUG
303 if (aSink) {
304 bool nonBlocking;
305 nsresult rv = aSink->IsNonBlocking(&nonBlocking);
306 if (NS_FAILED(rv) || nonBlocking) {
307 NS_ERROR("aSink should be a blocking stream");
310 #endif
311 mSink = aSink;
312 return NS_OK;
315 NS_IMETHODIMP
316 nsInputStreamTee::GetSink(nsIOutputStream** aSink)
318 NS_IF_ADDREF(*aSink = mSink);
319 return NS_OK;
322 NS_IMETHODIMP
323 nsInputStreamTee::SetEventTarget(nsIEventTarget* aEventTarget)
325 mEventTarget = aEventTarget;
326 if (mEventTarget) {
327 // Only need synchronization if this is an async tee
328 mLock = new Mutex("nsInputStreamTee.mLock");
330 return NS_OK;
333 NS_IMETHODIMP
334 nsInputStreamTee::GetEventTarget(nsIEventTarget** aEventTarget)
336 NS_IF_ADDREF(*aEventTarget = mEventTarget);
337 return NS_OK;
341 nsresult
342 NS_NewInputStreamTeeAsync(nsIInputStream** aResult,
343 nsIInputStream* aSource,
344 nsIOutputStream* aSink,
345 nsIEventTarget* aEventTarget)
347 nsresult rv;
349 nsCOMPtr<nsIInputStreamTee> tee = new nsInputStreamTee();
350 if (!tee) {
351 return NS_ERROR_OUT_OF_MEMORY;
354 rv = tee->SetSource(aSource);
355 if (NS_FAILED(rv)) {
356 return rv;
359 rv = tee->SetSink(aSink);
360 if (NS_FAILED(rv)) {
361 return rv;
364 rv = tee->SetEventTarget(aEventTarget);
365 if (NS_FAILED(rv)) {
366 return rv;
369 NS_ADDREF(*aResult = tee);
370 return rv;
373 nsresult
374 NS_NewInputStreamTee(nsIInputStream** aResult,
375 nsIInputStream* aSource,
376 nsIOutputStream* aSink)
378 return NS_NewInputStreamTeeAsync(aResult, aSource, aSink, nullptr);
381 #undef LOG