1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "IPCStreamDestination.h"
8 #include "mozilla/InputStreamLengthWrapper.h"
9 #include "mozilla/Mutex.h"
10 #include "nsIAsyncInputStream.h"
11 #include "nsIAsyncOutputStream.h"
12 #include "nsIBufferedStreams.h"
13 #include "nsICloneableInputStream.h"
15 #include "nsThreadUtils.h"
16 #include "mozilla/webrender/WebRenderTypes.h"
21 // ----------------------------------------------------------------------------
22 // IPCStreamDestination::DelayedStartInputStream
24 // When AutoIPCStream is used with delayedStart, we need to ask for data at the
25 // first real use of the nsIInputStream. In order to do so, we wrap the
26 // nsIInputStream, created by the nsIPipe, with this wrapper.
28 class IPCStreamDestination::DelayedStartInputStream final
29 : public nsIAsyncInputStream
,
30 public nsIInputStreamCallback
,
31 public nsISearchableInputStream
,
32 public nsICloneableInputStream
,
33 public nsIBufferedInputStream
{
35 NS_DECL_THREADSAFE_ISUPPORTS
37 DelayedStartInputStream(IPCStreamDestination
* aDestination
,
38 nsCOMPtr
<nsIAsyncInputStream
>&& aStream
)
39 : mDestination(aDestination
),
40 mStream(std::move(aStream
)),
41 mMutex("IPCStreamDestination::DelayedStartInputStream::mMutex") {
42 MOZ_ASSERT(mDestination
);
46 void DestinationShutdown() {
47 MutexAutoLock
lock(mMutex
);
48 mDestination
= nullptr;
51 // nsIInputStream interface
55 MaybeCloseDestination();
56 return mStream
->Close();
60 Available(uint64_t* aLength
) override
{
62 return mStream
->Available(aLength
);
66 Read(char* aBuffer
, uint32_t aCount
, uint32_t* aReadCount
) override
{
68 return mStream
->Read(aBuffer
, aCount
, aReadCount
);
72 ReadSegments(nsWriteSegmentFun aWriter
, void* aClosure
, uint32_t aCount
,
73 uint32_t* aResult
) override
{
75 return mStream
->ReadSegments(aWriter
, aClosure
, aCount
, aResult
);
79 IsNonBlocking(bool* aNonBlocking
) override
{
81 return mStream
->IsNonBlocking(aNonBlocking
);
84 // nsIAsyncInputStream interface
87 CloseWithStatus(nsresult aReason
) override
{
88 MaybeCloseDestination();
89 return mStream
->CloseWithStatus(aReason
);
93 AsyncWait(nsIInputStreamCallback
* aCallback
, uint32_t aFlags
,
94 uint32_t aRequestedCount
, nsIEventTarget
* aTarget
) override
{
96 MutexAutoLock
lock(mMutex
);
97 if (mAsyncWaitCallback
&& aCallback
) {
98 return NS_ERROR_FAILURE
;
101 mAsyncWaitCallback
= aCallback
;
103 MaybeStartReading(lock
);
106 nsCOMPtr
<nsIInputStreamCallback
> callback
= aCallback
? this : nullptr;
107 return mStream
->AsyncWait(callback
, aFlags
, aRequestedCount
, aTarget
);
111 Search(const char* aForString
, bool aIgnoreCase
, bool* aFound
,
112 uint32_t* aOffsetSearchedTo
) override
{
114 nsCOMPtr
<nsISearchableInputStream
> searchable
= do_QueryInterface(mStream
);
115 MOZ_ASSERT(searchable
);
116 return searchable
->Search(aForString
, aIgnoreCase
, aFound
,
120 // nsICloneableInputStream interface
123 GetCloneable(bool* aCloneable
) override
{
125 nsCOMPtr
<nsICloneableInputStream
> cloneable
= do_QueryInterface(mStream
);
126 MOZ_ASSERT(cloneable
);
127 return cloneable
->GetCloneable(aCloneable
);
131 Clone(nsIInputStream
** aResult
) override
{
133 nsCOMPtr
<nsICloneableInputStream
> cloneable
= do_QueryInterface(mStream
);
134 MOZ_ASSERT(cloneable
);
135 return cloneable
->Clone(aResult
);
138 // nsIBufferedInputStream
141 Init(nsIInputStream
* aStream
, uint32_t aBufferSize
) override
{
143 nsCOMPtr
<nsIBufferedInputStream
> stream
= do_QueryInterface(mStream
);
145 return stream
->Init(aStream
, aBufferSize
);
149 GetData(nsIInputStream
** aResult
) override
{
150 return NS_ERROR_NOT_IMPLEMENTED
;
153 // nsIInputStreamCallback
156 OnInputStreamReady(nsIAsyncInputStream
* aStream
) override
{
157 nsCOMPtr
<nsIInputStreamCallback
> callback
;
160 MutexAutoLock
lock(mMutex
);
162 // We have been canceled in the meanwhile.
163 if (!mAsyncWaitCallback
) {
167 callback
.swap(mAsyncWaitCallback
);
170 callback
->OnInputStreamReady(this);
174 void MaybeStartReading();
175 void MaybeStartReading(const MutexAutoLock
& aProofOfLook
);
177 void MaybeCloseDestination();
180 ~DelayedStartInputStream() = default;
182 IPCStreamDestination
* mDestination
;
183 nsCOMPtr
<nsIAsyncInputStream
> mStream
;
185 nsCOMPtr
<nsIInputStreamCallback
> mAsyncWaitCallback
;
187 // This protects mDestination: any method can be called by any thread.
188 Mutex mMutex MOZ_UNANNOTATED
;
190 class HelperRunnable
;
193 class IPCStreamDestination::DelayedStartInputStream::HelperRunnable final
202 IPCStreamDestination::DelayedStartInputStream
* aDelayedStartInputStream
,
205 "ipc::IPCStreamDestination::DelayedStartInputStream::"
207 mDelayedStartInputStream(aDelayedStartInputStream
),
209 MOZ_ASSERT(aDelayedStartInputStream
);
216 mDelayedStartInputStream
->MaybeStartReading();
218 case eCloseDestination
:
219 mDelayedStartInputStream
->MaybeCloseDestination();
227 RefPtr
<IPCStreamDestination::DelayedStartInputStream
>
228 mDelayedStartInputStream
;
232 void IPCStreamDestination::DelayedStartInputStream::MaybeStartReading() {
233 MutexAutoLock
lock(mMutex
);
234 MaybeStartReading(lock
);
237 void IPCStreamDestination::DelayedStartInputStream::MaybeStartReading(
238 const MutexAutoLock
& aProofOfLook
) {
243 if (mDestination
->IsOnOwningThread()) {
244 mDestination
->StartReading();
245 mDestination
= nullptr;
249 RefPtr
<Runnable
> runnable
=
250 new HelperRunnable(this, HelperRunnable::eStartReading
);
251 mDestination
->DispatchRunnable(runnable
.forget());
254 void IPCStreamDestination::DelayedStartInputStream::MaybeCloseDestination() {
255 MutexAutoLock
lock(mMutex
);
260 if (mDestination
->IsOnOwningThread()) {
261 mDestination
->RequestClose(NS_ERROR_ABORT
);
262 mDestination
= nullptr;
266 RefPtr
<Runnable
> runnable
=
267 new HelperRunnable(this, HelperRunnable::eCloseDestination
);
268 mDestination
->DispatchRunnable(runnable
.forget());
271 NS_IMPL_ADDREF(IPCStreamDestination::DelayedStartInputStream
);
272 NS_IMPL_RELEASE(IPCStreamDestination::DelayedStartInputStream
);
274 NS_INTERFACE_MAP_BEGIN(IPCStreamDestination::DelayedStartInputStream
)
275 NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream
)
276 NS_INTERFACE_MAP_ENTRY(nsIInputStreamCallback
)
277 NS_INTERFACE_MAP_ENTRY(nsISearchableInputStream
)
278 NS_INTERFACE_MAP_ENTRY(nsICloneableInputStream
)
279 NS_INTERFACE_MAP_ENTRY(nsIBufferedInputStream
)
280 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsIInputStream
, nsIAsyncInputStream
)
281 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports
, nsIAsyncInputStream
)
284 // ----------------------------------------------------------------------------
285 // IPCStreamDestination
287 IPCStreamDestination::IPCStreamDestination()
288 : mOwningThread(NS_GetCurrentThread()),
297 IPCStreamDestination::~IPCStreamDestination() = default;
299 nsresult
IPCStreamDestination::Initialize() {
300 MOZ_ASSERT(!mReader
);
301 MOZ_ASSERT(!mWriter
);
303 // use async versions for both reader and writer even though we are
304 // opening the writer as an infinite stream. We want to be able to
305 // use CloseWithStatus() to communicate errors through the pipe.
307 // Use an "infinite" pipe because we cannot apply back-pressure through
308 // the async IPC layer at the moment. Blocking the IPC worker thread
309 // is not desirable, either.
310 nsresult rv
= NS_NewPipe2(getter_AddRefs(mReader
), getter_AddRefs(mWriter
),
311 true, true, // non-blocking
313 UINT32_MAX
); // "infinite" pipe
314 if (NS_WARN_IF(NS_FAILED(rv
))) {
321 void IPCStreamDestination::SetDelayedStart(bool aDelayedStart
) {
322 mDelayedStart
= aDelayedStart
;
325 void IPCStreamDestination::SetLength(int64_t aLength
) {
327 MOZ_ASSERT(!mLengthSet
);
336 already_AddRefed
<nsIInputStream
> IPCStreamDestination::TakeReader() {
338 MOZ_ASSERT(!mDelayedStartInputStream
);
340 nsCOMPtr
<nsIAsyncInputStream
> reader
{mReader
.forget()};
342 mDelayedStartInputStream
=
343 new DelayedStartInputStream(this, std::move(reader
));
344 reader
= mDelayedStartInputStream
;
349 MOZ_ASSERT(mLengthSet
);
350 nsCOMPtr
<nsIInputStream
> finalStream
=
351 new InputStreamLengthWrapper(reader
.forget(), mLength
);
352 reader
= do_QueryInterface(finalStream
);
356 return reader
.forget();
359 bool IPCStreamDestination::IsOnOwningThread() const {
360 return mOwningThread
== NS_GetCurrentThread();
363 void IPCStreamDestination::DispatchRunnable(
364 already_AddRefed
<nsIRunnable
>&& aRunnable
) {
365 nsCOMPtr
<nsIRunnable
> runnable
= aRunnable
;
366 mOwningThread
->Dispatch(runnable
.forget(), NS_DISPATCH_NORMAL
);
369 void IPCStreamDestination::ActorDestroyed() {
372 // If we were gracefully closed we should have gotten RecvClose(). In
373 // that case, the writer will already be closed and this will have no
374 // effect. This just aborts the writer in the case where the child process
376 mWriter
->CloseWithStatus(NS_ERROR_ABORT
);
378 if (mDelayedStartInputStream
) {
379 mDelayedStartInputStream
->DestinationShutdown();
380 mDelayedStartInputStream
= nullptr;
384 void IPCStreamDestination::BufferReceived(const wr::ByteBuffer
& aBuffer
) {
387 uint32_t numWritten
= 0;
389 // This should only fail if we hit an OOM condition.
390 nsresult rv
= mWriter
->Write(reinterpret_cast<char*>(aBuffer
.mData
),
391 aBuffer
.mLength
, &numWritten
);
392 if (NS_WARN_IF(NS_FAILED(rv
))) {
397 void IPCStreamDestination::CloseReceived(nsresult aRv
) {
399 mWriter
->CloseWithStatus(aRv
);
400 TerminateDestination();
404 } // namespace mozilla