Bug 1860492 - Change file name in test @ toolkit/components/antitracking/test/browser...
[gecko.git] / xpcom / io / nsInputStreamTee.cpp
blob3c0d32e0cbaf60e9ba883a8da14590629c773bee
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include <stdlib.h>
8 #include "mozilla/Logging.h"
10 #include "mozilla/Maybe.h"
11 #include "mozilla/Mutex.h"
12 #include "mozilla/Attributes.h"
13 #include "nsIInputStreamTee.h"
14 #include "nsIInputStream.h"
15 #include "nsIOutputStream.h"
16 #include "nsCOMPtr.h"
17 #include "nsIEventTarget.h"
18 #include "nsThreadUtils.h"
20 using namespace mozilla;
22 #ifdef LOG
23 # undef LOG
24 #endif
26 static LazyLogModule sTeeLog("nsInputStreamTee");
27 #define LOG(args) MOZ_LOG(sTeeLog, mozilla::LogLevel::Debug, args)
29 class nsInputStreamTee final : public nsIInputStreamTee {
30 public:
31 NS_DECL_THREADSAFE_ISUPPORTS
32 NS_DECL_NSIINPUTSTREAM
33 NS_DECL_NSIINPUTSTREAMTEE
35 nsInputStreamTee();
36 bool SinkIsValid();
37 void InvalidateSink();
39 private:
40 ~nsInputStreamTee() = default;
42 nsresult TeeSegment(const char* aBuf, uint32_t aCount);
44 static nsresult WriteSegmentFun(nsIInputStream*, void*, const char*, uint32_t,
45 uint32_t, uint32_t*);
47 private:
48 nsCOMPtr<nsIInputStream> mSource;
49 nsCOMPtr<nsIOutputStream> mSink;
50 nsCOMPtr<nsIEventTarget> mEventTarget;
51 nsWriteSegmentFun mWriter; // for implementing ReadSegments
52 void* mClosure; // for implementing ReadSegments
53 Maybe<Mutex> mLock; // synchronize access to mSinkIsValid
54 bool mSinkIsValid; // False if TeeWriteEvent fails
57 class nsInputStreamTeeWriteEvent : public Runnable {
58 public:
59 // aTee's lock is held across construction of this object
60 nsInputStreamTeeWriteEvent(const char* aBuf, uint32_t aCount,
61 nsIOutputStream* aSink, nsInputStreamTee* aTee)
62 : mozilla::Runnable("nsInputStreamTeeWriteEvent") {
63 // copy the buffer - will be free'd by dtor
64 mBuf = (char*)malloc(aCount);
65 if (mBuf) {
66 memcpy(mBuf, (char*)aBuf, aCount);
68 mCount = aCount;
69 mSink = aSink;
70 bool isNonBlocking;
71 mSink->IsNonBlocking(&isNonBlocking);
72 NS_ASSERTION(isNonBlocking == false, "mSink is nonblocking");
73 mTee = aTee;
76 NS_IMETHOD Run() override {
77 if (!mBuf) {
78 NS_WARNING(
79 "nsInputStreamTeeWriteEvent::Run() "
80 "memory not allocated\n");
81 return NS_OK;
83 MOZ_ASSERT(mSink, "mSink is null!");
85 // The output stream could have been invalidated between when
86 // this event was dispatched and now, so check before writing.
87 if (!mTee->SinkIsValid()) {
88 return NS_OK;
91 LOG(
92 ("nsInputStreamTeeWriteEvent::Run() [%p]"
93 "will write %u bytes to %p\n",
94 this, mCount, mSink.get()));
96 uint32_t totalBytesWritten = 0;
97 while (mCount) {
98 nsresult rv;
99 uint32_t bytesWritten = 0;
100 rv = mSink->Write(mBuf + totalBytesWritten, mCount, &bytesWritten);
101 if (NS_FAILED(rv)) {
102 LOG(("nsInputStreamTeeWriteEvent::Run[%p] error %" PRIx32 " in writing",
103 this, static_cast<uint32_t>(rv)));
104 mTee->InvalidateSink();
105 break;
107 totalBytesWritten += bytesWritten;
108 NS_ASSERTION(bytesWritten <= mCount, "wrote too much");
109 mCount -= bytesWritten;
111 return NS_OK;
114 protected:
115 virtual ~nsInputStreamTeeWriteEvent() {
116 if (mBuf) {
117 free(mBuf);
119 mBuf = nullptr;
122 private:
123 char* mBuf;
124 uint32_t mCount;
125 nsCOMPtr<nsIOutputStream> mSink;
126 // back pointer to the tee that created this runnable
127 RefPtr<nsInputStreamTee> mTee;
130 nsInputStreamTee::nsInputStreamTee()
131 : mWriter(nullptr), mClosure(nullptr), mSinkIsValid(true) {}
133 bool nsInputStreamTee::SinkIsValid() {
134 MutexAutoLock lock(*mLock);
135 return mSinkIsValid;
138 void nsInputStreamTee::InvalidateSink() {
139 MutexAutoLock lock(*mLock);
140 mSinkIsValid = false;
143 nsresult nsInputStreamTee::TeeSegment(const char* aBuf, uint32_t aCount) {
144 if (!mSink) {
145 return NS_OK; // nothing to do
147 if (mLock) { // asynchronous case
148 NS_ASSERTION(mEventTarget, "mEventTarget is null, mLock is not null.");
149 if (!SinkIsValid()) {
150 return NS_OK; // nothing to do
152 nsCOMPtr<nsIRunnable> event =
153 new nsInputStreamTeeWriteEvent(aBuf, aCount, mSink, this);
154 LOG(("nsInputStreamTee::TeeSegment [%p] dispatching write %u bytes\n", this,
155 aCount));
156 return mEventTarget->Dispatch(event, NS_DISPATCH_NORMAL);
157 } else { // synchronous case
158 NS_ASSERTION(!mEventTarget, "mEventTarget is not null, mLock is null.");
159 nsresult rv;
160 uint32_t totalBytesWritten = 0;
161 while (aCount) {
162 uint32_t bytesWritten = 0;
163 rv = mSink->Write(aBuf + totalBytesWritten, aCount, &bytesWritten);
164 if (NS_FAILED(rv)) {
165 // ok, this is not a fatal error... just drop our reference to mSink
166 // and continue on as if nothing happened.
167 NS_WARNING("Write failed (non-fatal)");
168 // catch possible misuse of the input stream tee
169 NS_ASSERTION(rv != NS_BASE_STREAM_WOULD_BLOCK,
170 "sink must be a blocking stream");
171 mSink = nullptr;
172 break;
174 totalBytesWritten += bytesWritten;
175 NS_ASSERTION(bytesWritten <= aCount, "wrote too much");
176 aCount -= bytesWritten;
178 return NS_OK;
182 nsresult nsInputStreamTee::WriteSegmentFun(nsIInputStream* aIn, void* aClosure,
183 const char* aFromSegment,
184 uint32_t aOffset, uint32_t aCount,
185 uint32_t* aWriteCount) {
186 nsInputStreamTee* tee = reinterpret_cast<nsInputStreamTee*>(aClosure);
187 nsresult rv = tee->mWriter(aIn, tee->mClosure, aFromSegment, aOffset, aCount,
188 aWriteCount);
189 if (NS_FAILED(rv) || (*aWriteCount == 0)) {
190 NS_ASSERTION((NS_FAILED(rv) ? (*aWriteCount == 0) : true),
191 "writer returned an error with non-zero writeCount");
192 return rv;
195 return tee->TeeSegment(aFromSegment, *aWriteCount);
198 NS_IMPL_ISUPPORTS(nsInputStreamTee, nsIInputStreamTee, nsIInputStream)
199 NS_IMETHODIMP
200 nsInputStreamTee::Close() {
201 if (NS_WARN_IF(!mSource)) {
202 return NS_ERROR_NOT_INITIALIZED;
204 nsresult rv = mSource->Close();
205 mSource = nullptr;
206 mSink = nullptr;
207 return rv;
210 NS_IMETHODIMP
211 nsInputStreamTee::Available(uint64_t* aAvail) {
212 if (NS_WARN_IF(!mSource)) {
213 return NS_ERROR_NOT_INITIALIZED;
215 return mSource->Available(aAvail);
218 NS_IMETHODIMP
219 nsInputStreamTee::StreamStatus() {
220 if (NS_WARN_IF(!mSource)) {
221 return NS_ERROR_NOT_INITIALIZED;
223 return mSource->StreamStatus();
226 NS_IMETHODIMP
227 nsInputStreamTee::Read(char* aBuf, uint32_t aCount, uint32_t* aBytesRead) {
228 if (NS_WARN_IF(!mSource)) {
229 return NS_ERROR_NOT_INITIALIZED;
232 nsresult rv = mSource->Read(aBuf, aCount, aBytesRead);
233 if (NS_FAILED(rv) || (*aBytesRead == 0)) {
234 return rv;
237 return TeeSegment(aBuf, *aBytesRead);
240 NS_IMETHODIMP
241 nsInputStreamTee::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
242 uint32_t aCount, uint32_t* aBytesRead) {
243 if (NS_WARN_IF(!mSource)) {
244 return NS_ERROR_NOT_INITIALIZED;
247 mWriter = aWriter;
248 mClosure = aClosure;
250 return mSource->ReadSegments(WriteSegmentFun, this, aCount, aBytesRead);
253 NS_IMETHODIMP
254 nsInputStreamTee::IsNonBlocking(bool* aResult) {
255 if (NS_WARN_IF(!mSource)) {
256 return NS_ERROR_NOT_INITIALIZED;
258 return mSource->IsNonBlocking(aResult);
261 NS_IMETHODIMP
262 nsInputStreamTee::SetSource(nsIInputStream* aSource) {
263 mSource = aSource;
264 return NS_OK;
267 NS_IMETHODIMP
268 nsInputStreamTee::GetSource(nsIInputStream** aSource) {
269 NS_IF_ADDREF(*aSource = mSource);
270 return NS_OK;
273 NS_IMETHODIMP
274 nsInputStreamTee::SetSink(nsIOutputStream* aSink) {
275 #ifdef DEBUG
276 if (aSink) {
277 bool nonBlocking;
278 nsresult rv = aSink->IsNonBlocking(&nonBlocking);
279 if (NS_FAILED(rv) || nonBlocking) {
280 NS_ERROR("aSink should be a blocking stream");
283 #endif
284 mSink = aSink;
285 return NS_OK;
288 NS_IMETHODIMP
289 nsInputStreamTee::GetSink(nsIOutputStream** aSink) {
290 NS_IF_ADDREF(*aSink = mSink);
291 return NS_OK;
294 NS_IMETHODIMP
295 nsInputStreamTee::SetEventTarget(nsIEventTarget* aEventTarget) {
296 mEventTarget = aEventTarget;
297 if (mEventTarget) {
298 // Only need synchronization if this is an async tee
299 mLock.emplace("nsInputStreamTee.mLock");
301 return NS_OK;
304 NS_IMETHODIMP
305 nsInputStreamTee::GetEventTarget(nsIEventTarget** aEventTarget) {
306 NS_IF_ADDREF(*aEventTarget = mEventTarget);
307 return NS_OK;
310 nsresult NS_NewInputStreamTeeAsync(nsIInputStream** aResult,
311 nsIInputStream* aSource,
312 nsIOutputStream* aSink,
313 nsIEventTarget* aEventTarget) {
314 nsresult rv;
316 nsCOMPtr<nsIInputStreamTee> tee = new nsInputStreamTee();
317 rv = tee->SetSource(aSource);
318 if (NS_FAILED(rv)) {
319 return rv;
322 rv = tee->SetSink(aSink);
323 if (NS_FAILED(rv)) {
324 return rv;
327 rv = tee->SetEventTarget(aEventTarget);
328 if (NS_FAILED(rv)) {
329 return rv;
332 tee.forget(aResult);
333 return rv;
336 nsresult NS_NewInputStreamTee(nsIInputStream** aResult, nsIInputStream* aSource,
337 nsIOutputStream* aSink) {
338 return NS_NewInputStreamTeeAsync(aResult, aSource, aSink, nullptr);
341 #undef LOG