Bug 1908539 restrict MacOS platform audio processing to Nightly r=webrtc-reviewers...
[gecko.git] / xpcom / io / nsInputStreamTee.cpp
blob656f1dcb3840a2d29c8a83b2dbe2ab3051edeee9
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include <stdlib.h>
8 #include "mozilla/Logging.h"
10 #include "mozilla/Maybe.h"
11 #include "mozilla/Mutex.h"
12 #include "mozilla/Attributes.h"
13 #include "mozilla/IntegerPrintfMacros.h"
14 #include "nsIInputStreamTee.h"
15 #include "nsIInputStream.h"
16 #include "nsIOutputStream.h"
17 #include "nsCOMPtr.h"
18 #include "nsIEventTarget.h"
19 #include "nsThreadUtils.h"
21 using namespace mozilla;
23 #ifdef LOG
24 # undef LOG
25 #endif
27 static LazyLogModule sTeeLog("nsInputStreamTee");
28 #define LOG(args) MOZ_LOG(sTeeLog, mozilla::LogLevel::Debug, args)
30 class nsInputStreamTee final : public nsIInputStreamTee {
31 public:
32 NS_DECL_THREADSAFE_ISUPPORTS
33 NS_DECL_NSIINPUTSTREAM
34 NS_DECL_NSIINPUTSTREAMTEE
36 nsInputStreamTee();
37 bool SinkIsValid();
38 void InvalidateSink();
40 private:
41 ~nsInputStreamTee() = default;
43 nsresult TeeSegment(const char* aBuf, uint32_t aCount);
45 static nsresult WriteSegmentFun(nsIInputStream*, void*, const char*, uint32_t,
46 uint32_t, uint32_t*);
48 private:
49 nsCOMPtr<nsIInputStream> mSource;
50 nsCOMPtr<nsIOutputStream> mSink;
51 nsCOMPtr<nsIEventTarget> mEventTarget;
52 nsWriteSegmentFun mWriter; // for implementing ReadSegments
53 void* mClosure; // for implementing ReadSegments
54 Maybe<Mutex> mLock; // synchronize access to mSinkIsValid
55 bool mSinkIsValid; // False if TeeWriteEvent fails
58 class nsInputStreamTeeWriteEvent : public Runnable {
59 public:
60 // aTee's lock is held across construction of this object
61 nsInputStreamTeeWriteEvent(const char* aBuf, uint32_t aCount,
62 nsIOutputStream* aSink, nsInputStreamTee* aTee)
63 : mozilla::Runnable("nsInputStreamTeeWriteEvent") {
64 // copy the buffer - will be free'd by dtor
65 mBuf = (char*)malloc(aCount);
66 if (mBuf) {
67 memcpy(mBuf, (char*)aBuf, aCount);
69 mCount = aCount;
70 mSink = aSink;
71 bool isNonBlocking;
72 mSink->IsNonBlocking(&isNonBlocking);
73 NS_ASSERTION(isNonBlocking == false, "mSink is nonblocking");
74 mTee = aTee;
77 NS_IMETHOD Run() override {
78 if (!mBuf) {
79 NS_WARNING(
80 "nsInputStreamTeeWriteEvent::Run() "
81 "memory not allocated\n");
82 return NS_OK;
84 MOZ_ASSERT(mSink, "mSink is null!");
86 // The output stream could have been invalidated between when
87 // this event was dispatched and now, so check before writing.
88 if (!mTee->SinkIsValid()) {
89 return NS_OK;
92 LOG(
93 ("nsInputStreamTeeWriteEvent::Run() [%p]"
94 "will write %u bytes to %p\n",
95 this, mCount, mSink.get()));
97 uint32_t totalBytesWritten = 0;
98 while (mCount) {
99 nsresult rv;
100 uint32_t bytesWritten = 0;
101 rv = mSink->Write(mBuf + totalBytesWritten, mCount, &bytesWritten);
102 if (NS_FAILED(rv)) {
103 LOG(("nsInputStreamTeeWriteEvent::Run[%p] error %" PRIx32 " in writing",
104 this, static_cast<uint32_t>(rv)));
105 mTee->InvalidateSink();
106 break;
108 totalBytesWritten += bytesWritten;
109 NS_ASSERTION(bytesWritten <= mCount, "wrote too much");
110 mCount -= bytesWritten;
112 return NS_OK;
115 protected:
116 virtual ~nsInputStreamTeeWriteEvent() {
117 if (mBuf) {
118 free(mBuf);
120 mBuf = nullptr;
123 private:
124 char* mBuf;
125 uint32_t mCount;
126 nsCOMPtr<nsIOutputStream> mSink;
127 // back pointer to the tee that created this runnable
128 RefPtr<nsInputStreamTee> mTee;
131 nsInputStreamTee::nsInputStreamTee()
132 : mWriter(nullptr), mClosure(nullptr), mSinkIsValid(true) {}
134 bool nsInputStreamTee::SinkIsValid() {
135 MutexAutoLock lock(*mLock);
136 return mSinkIsValid;
139 void nsInputStreamTee::InvalidateSink() {
140 MutexAutoLock lock(*mLock);
141 mSinkIsValid = false;
144 nsresult nsInputStreamTee::TeeSegment(const char* aBuf, uint32_t aCount) {
145 if (!mSink) {
146 return NS_OK; // nothing to do
148 if (mLock) { // asynchronous case
149 NS_ASSERTION(mEventTarget, "mEventTarget is null, mLock is not null.");
150 if (!SinkIsValid()) {
151 return NS_OK; // nothing to do
153 nsCOMPtr<nsIRunnable> event =
154 new nsInputStreamTeeWriteEvent(aBuf, aCount, mSink, this);
155 LOG(("nsInputStreamTee::TeeSegment [%p] dispatching write %u bytes\n", this,
156 aCount));
157 return mEventTarget->Dispatch(event, NS_DISPATCH_NORMAL);
158 } else { // synchronous case
159 NS_ASSERTION(!mEventTarget, "mEventTarget is not null, mLock is null.");
160 nsresult rv;
161 uint32_t totalBytesWritten = 0;
162 while (aCount) {
163 uint32_t bytesWritten = 0;
164 rv = mSink->Write(aBuf + totalBytesWritten, aCount, &bytesWritten);
165 if (NS_FAILED(rv)) {
166 // ok, this is not a fatal error... just drop our reference to mSink
167 // and continue on as if nothing happened.
168 NS_WARNING("Write failed (non-fatal)");
169 // catch possible misuse of the input stream tee
170 NS_ASSERTION(rv != NS_BASE_STREAM_WOULD_BLOCK,
171 "sink must be a blocking stream");
172 mSink = nullptr;
173 break;
175 totalBytesWritten += bytesWritten;
176 NS_ASSERTION(bytesWritten <= aCount, "wrote too much");
177 aCount -= bytesWritten;
179 return NS_OK;
183 nsresult nsInputStreamTee::WriteSegmentFun(nsIInputStream* aIn, void* aClosure,
184 const char* aFromSegment,
185 uint32_t aOffset, uint32_t aCount,
186 uint32_t* aWriteCount) {
187 nsInputStreamTee* tee = reinterpret_cast<nsInputStreamTee*>(aClosure);
188 nsresult rv = tee->mWriter(aIn, tee->mClosure, aFromSegment, aOffset, aCount,
189 aWriteCount);
190 if (NS_FAILED(rv) || (*aWriteCount == 0)) {
191 NS_ASSERTION((NS_FAILED(rv) ? (*aWriteCount == 0) : true),
192 "writer returned an error with non-zero writeCount");
193 return rv;
196 return tee->TeeSegment(aFromSegment, *aWriteCount);
199 NS_IMPL_ISUPPORTS(nsInputStreamTee, nsIInputStreamTee, nsIInputStream)
200 NS_IMETHODIMP
201 nsInputStreamTee::Close() {
202 if (NS_WARN_IF(!mSource)) {
203 return NS_ERROR_NOT_INITIALIZED;
205 nsresult rv = mSource->Close();
206 mSource = nullptr;
207 mSink = nullptr;
208 return rv;
211 NS_IMETHODIMP
212 nsInputStreamTee::Available(uint64_t* aAvail) {
213 if (NS_WARN_IF(!mSource)) {
214 return NS_ERROR_NOT_INITIALIZED;
216 return mSource->Available(aAvail);
219 NS_IMETHODIMP
220 nsInputStreamTee::StreamStatus() {
221 if (NS_WARN_IF(!mSource)) {
222 return NS_ERROR_NOT_INITIALIZED;
224 return mSource->StreamStatus();
227 NS_IMETHODIMP
228 nsInputStreamTee::Read(char* aBuf, uint32_t aCount, uint32_t* aBytesRead) {
229 if (NS_WARN_IF(!mSource)) {
230 return NS_ERROR_NOT_INITIALIZED;
233 nsresult rv = mSource->Read(aBuf, aCount, aBytesRead);
234 if (NS_FAILED(rv) || (*aBytesRead == 0)) {
235 return rv;
238 return TeeSegment(aBuf, *aBytesRead);
241 NS_IMETHODIMP
242 nsInputStreamTee::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
243 uint32_t aCount, uint32_t* aBytesRead) {
244 if (NS_WARN_IF(!mSource)) {
245 return NS_ERROR_NOT_INITIALIZED;
248 mWriter = aWriter;
249 mClosure = aClosure;
251 return mSource->ReadSegments(WriteSegmentFun, this, aCount, aBytesRead);
254 NS_IMETHODIMP
255 nsInputStreamTee::IsNonBlocking(bool* aResult) {
256 if (NS_WARN_IF(!mSource)) {
257 return NS_ERROR_NOT_INITIALIZED;
259 return mSource->IsNonBlocking(aResult);
262 NS_IMETHODIMP
263 nsInputStreamTee::SetSource(nsIInputStream* aSource) {
264 mSource = aSource;
265 return NS_OK;
268 NS_IMETHODIMP
269 nsInputStreamTee::GetSource(nsIInputStream** aSource) {
270 NS_IF_ADDREF(*aSource = mSource);
271 return NS_OK;
274 NS_IMETHODIMP
275 nsInputStreamTee::SetSink(nsIOutputStream* aSink) {
276 #ifdef DEBUG
277 if (aSink) {
278 bool nonBlocking;
279 nsresult rv = aSink->IsNonBlocking(&nonBlocking);
280 if (NS_FAILED(rv) || nonBlocking) {
281 NS_ERROR("aSink should be a blocking stream");
284 #endif
285 mSink = aSink;
286 return NS_OK;
289 NS_IMETHODIMP
290 nsInputStreamTee::GetSink(nsIOutputStream** aSink) {
291 NS_IF_ADDREF(*aSink = mSink);
292 return NS_OK;
295 NS_IMETHODIMP
296 nsInputStreamTee::SetEventTarget(nsIEventTarget* aEventTarget) {
297 mEventTarget = aEventTarget;
298 if (mEventTarget) {
299 // Only need synchronization if this is an async tee
300 mLock.emplace("nsInputStreamTee.mLock");
302 return NS_OK;
305 NS_IMETHODIMP
306 nsInputStreamTee::GetEventTarget(nsIEventTarget** aEventTarget) {
307 NS_IF_ADDREF(*aEventTarget = mEventTarget);
308 return NS_OK;
311 nsresult NS_NewInputStreamTeeAsync(nsIInputStream** aResult,
312 nsIInputStream* aSource,
313 nsIOutputStream* aSink,
314 nsIEventTarget* aEventTarget) {
315 nsresult rv;
317 nsCOMPtr<nsIInputStreamTee> tee = new nsInputStreamTee();
318 rv = tee->SetSource(aSource);
319 if (NS_FAILED(rv)) {
320 return rv;
323 rv = tee->SetSink(aSink);
324 if (NS_FAILED(rv)) {
325 return rv;
328 rv = tee->SetEventTarget(aEventTarget);
329 if (NS_FAILED(rv)) {
330 return rv;
333 tee.forget(aResult);
334 return rv;
337 nsresult NS_NewInputStreamTee(nsIInputStream** aResult, nsIInputStream* aSource,
338 nsIOutputStream* aSink) {
339 return NS_NewInputStreamTeeAsync(aResult, aSource, aSink, nullptr);
342 #undef LOG