Bug 1758813 [wpt PR 33142] - Implement RP sign out, a=testonly
[gecko.git] / ipc / glue / IPCStreamDestination.cpp
blob16653cd489be9330880a0b52c58b44ae19a53c38
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "IPCStreamDestination.h"
8 #include "mozilla/InputStreamLengthWrapper.h"
9 #include "mozilla/Mutex.h"
10 #include "nsIAsyncInputStream.h"
11 #include "nsIAsyncOutputStream.h"
12 #include "nsIBufferedStreams.h"
13 #include "nsICloneableInputStream.h"
14 #include "nsIPipe.h"
15 #include "nsThreadUtils.h"
16 #include "mozilla/webrender/WebRenderTypes.h"
18 namespace mozilla {
19 namespace ipc {
21 // ----------------------------------------------------------------------------
22 // IPCStreamDestination::DelayedStartInputStream
24 // When AutoIPCStream is used with delayedStart, we need to ask for data at the
25 // first real use of the nsIInputStream. In order to do so, we wrap the
26 // nsIInputStream, created by the nsIPipe, with this wrapper.
28 class IPCStreamDestination::DelayedStartInputStream final
29 : public nsIAsyncInputStream,
30 public nsIInputStreamCallback,
31 public nsISearchableInputStream,
32 public nsICloneableInputStream,
33 public nsIBufferedInputStream {
34 public:
35 NS_DECL_THREADSAFE_ISUPPORTS
37 DelayedStartInputStream(IPCStreamDestination* aDestination,
38 nsCOMPtr<nsIAsyncInputStream>&& aStream)
39 : mDestination(aDestination),
40 mStream(std::move(aStream)),
41 mMutex("IPCStreamDestination::DelayedStartInputStream::mMutex") {
42 MOZ_ASSERT(mDestination);
43 MOZ_ASSERT(mStream);
46 void DestinationShutdown() {
47 MutexAutoLock lock(mMutex);
48 mDestination = nullptr;
51 // nsIInputStream interface
53 NS_IMETHOD
54 Close() override {
55 MaybeCloseDestination();
56 return mStream->Close();
59 NS_IMETHOD
60 Available(uint64_t* aLength) override {
61 MaybeStartReading();
62 return mStream->Available(aLength);
65 NS_IMETHOD
66 Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override {
67 MaybeStartReading();
68 return mStream->Read(aBuffer, aCount, aReadCount);
71 NS_IMETHOD
72 ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount,
73 uint32_t* aResult) override {
74 MaybeStartReading();
75 return mStream->ReadSegments(aWriter, aClosure, aCount, aResult);
78 NS_IMETHOD
79 IsNonBlocking(bool* aNonBlocking) override {
80 MaybeStartReading();
81 return mStream->IsNonBlocking(aNonBlocking);
84 // nsIAsyncInputStream interface
86 NS_IMETHOD
87 CloseWithStatus(nsresult aReason) override {
88 MaybeCloseDestination();
89 return mStream->CloseWithStatus(aReason);
92 NS_IMETHOD
93 AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags,
94 uint32_t aRequestedCount, nsIEventTarget* aTarget) override {
96 MutexAutoLock lock(mMutex);
97 if (mAsyncWaitCallback && aCallback) {
98 return NS_ERROR_FAILURE;
101 mAsyncWaitCallback = aCallback;
103 MaybeStartReading(lock);
106 nsCOMPtr<nsIInputStreamCallback> callback = aCallback ? this : nullptr;
107 return mStream->AsyncWait(callback, aFlags, aRequestedCount, aTarget);
110 NS_IMETHOD
111 Search(const char* aForString, bool aIgnoreCase, bool* aFound,
112 uint32_t* aOffsetSearchedTo) override {
113 MaybeStartReading();
114 nsCOMPtr<nsISearchableInputStream> searchable = do_QueryInterface(mStream);
115 MOZ_ASSERT(searchable);
116 return searchable->Search(aForString, aIgnoreCase, aFound,
117 aOffsetSearchedTo);
120 // nsICloneableInputStream interface
122 NS_IMETHOD
123 GetCloneable(bool* aCloneable) override {
124 MaybeStartReading();
125 nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(mStream);
126 MOZ_ASSERT(cloneable);
127 return cloneable->GetCloneable(aCloneable);
130 NS_IMETHOD
131 Clone(nsIInputStream** aResult) override {
132 MaybeStartReading();
133 nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(mStream);
134 MOZ_ASSERT(cloneable);
135 return cloneable->Clone(aResult);
138 // nsIBufferedInputStream
140 NS_IMETHOD
141 Init(nsIInputStream* aStream, uint32_t aBufferSize) override {
142 MaybeStartReading();
143 nsCOMPtr<nsIBufferedInputStream> stream = do_QueryInterface(mStream);
144 MOZ_ASSERT(stream);
145 return stream->Init(aStream, aBufferSize);
148 NS_IMETHODIMP
149 GetData(nsIInputStream** aResult) override {
150 return NS_ERROR_NOT_IMPLEMENTED;
153 // nsIInputStreamCallback
155 NS_IMETHOD
156 OnInputStreamReady(nsIAsyncInputStream* aStream) override {
157 nsCOMPtr<nsIInputStreamCallback> callback;
160 MutexAutoLock lock(mMutex);
162 // We have been canceled in the meanwhile.
163 if (!mAsyncWaitCallback) {
164 return NS_OK;
167 callback.swap(mAsyncWaitCallback);
170 callback->OnInputStreamReady(this);
171 return NS_OK;
174 void MaybeStartReading();
175 void MaybeStartReading(const MutexAutoLock& aProofOfLook);
177 void MaybeCloseDestination();
179 private:
180 ~DelayedStartInputStream() = default;
182 IPCStreamDestination* mDestination;
183 nsCOMPtr<nsIAsyncInputStream> mStream;
185 nsCOMPtr<nsIInputStreamCallback> mAsyncWaitCallback;
187 // This protects mDestination: any method can be called by any thread.
188 Mutex mMutex MOZ_UNANNOTATED;
190 class HelperRunnable;
193 class IPCStreamDestination::DelayedStartInputStream::HelperRunnable final
194 : public Runnable {
195 public:
196 enum Op {
197 eStartReading,
198 eCloseDestination,
201 HelperRunnable(
202 IPCStreamDestination::DelayedStartInputStream* aDelayedStartInputStream,
203 Op aOp)
204 : Runnable(
205 "ipc::IPCStreamDestination::DelayedStartInputStream::"
206 "HelperRunnable"),
207 mDelayedStartInputStream(aDelayedStartInputStream),
208 mOp(aOp) {
209 MOZ_ASSERT(aDelayedStartInputStream);
212 NS_IMETHOD
213 Run() override {
214 switch (mOp) {
215 case eStartReading:
216 mDelayedStartInputStream->MaybeStartReading();
217 break;
218 case eCloseDestination:
219 mDelayedStartInputStream->MaybeCloseDestination();
220 break;
223 return NS_OK;
226 private:
227 RefPtr<IPCStreamDestination::DelayedStartInputStream>
228 mDelayedStartInputStream;
229 Op mOp;
232 void IPCStreamDestination::DelayedStartInputStream::MaybeStartReading() {
233 MutexAutoLock lock(mMutex);
234 MaybeStartReading(lock);
237 void IPCStreamDestination::DelayedStartInputStream::MaybeStartReading(
238 const MutexAutoLock& aProofOfLook) {
239 if (!mDestination) {
240 return;
243 if (mDestination->IsOnOwningThread()) {
244 mDestination->StartReading();
245 mDestination = nullptr;
246 return;
249 RefPtr<Runnable> runnable =
250 new HelperRunnable(this, HelperRunnable::eStartReading);
251 mDestination->DispatchRunnable(runnable.forget());
254 void IPCStreamDestination::DelayedStartInputStream::MaybeCloseDestination() {
255 MutexAutoLock lock(mMutex);
256 if (!mDestination) {
257 return;
260 if (mDestination->IsOnOwningThread()) {
261 mDestination->RequestClose(NS_ERROR_ABORT);
262 mDestination = nullptr;
263 return;
266 RefPtr<Runnable> runnable =
267 new HelperRunnable(this, HelperRunnable::eCloseDestination);
268 mDestination->DispatchRunnable(runnable.forget());
271 NS_IMPL_ADDREF(IPCStreamDestination::DelayedStartInputStream);
272 NS_IMPL_RELEASE(IPCStreamDestination::DelayedStartInputStream);
274 NS_INTERFACE_MAP_BEGIN(IPCStreamDestination::DelayedStartInputStream)
275 NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream)
276 NS_INTERFACE_MAP_ENTRY(nsIInputStreamCallback)
277 NS_INTERFACE_MAP_ENTRY(nsISearchableInputStream)
278 NS_INTERFACE_MAP_ENTRY(nsICloneableInputStream)
279 NS_INTERFACE_MAP_ENTRY(nsIBufferedInputStream)
280 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsIInputStream, nsIAsyncInputStream)
281 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIAsyncInputStream)
282 NS_INTERFACE_MAP_END
284 // ----------------------------------------------------------------------------
285 // IPCStreamDestination
287 IPCStreamDestination::IPCStreamDestination()
288 : mOwningThread(NS_GetCurrentThread()),
289 mDelayedStart(false)
290 #ifdef MOZ_DEBUG
292 mLengthSet(false)
293 #endif
297 IPCStreamDestination::~IPCStreamDestination() = default;
299 nsresult IPCStreamDestination::Initialize() {
300 MOZ_ASSERT(!mReader);
301 MOZ_ASSERT(!mWriter);
303 // use async versions for both reader and writer even though we are
304 // opening the writer as an infinite stream. We want to be able to
305 // use CloseWithStatus() to communicate errors through the pipe.
307 // Use an "infinite" pipe because we cannot apply back-pressure through
308 // the async IPC layer at the moment. Blocking the IPC worker thread
309 // is not desirable, either.
310 nsresult rv = NS_NewPipe2(getter_AddRefs(mReader), getter_AddRefs(mWriter),
311 true, true, // non-blocking
312 0, // segment size
313 UINT32_MAX); // "infinite" pipe
314 if (NS_WARN_IF(NS_FAILED(rv))) {
315 return rv;
318 return NS_OK;
321 void IPCStreamDestination::SetDelayedStart(bool aDelayedStart) {
322 mDelayedStart = aDelayedStart;
325 void IPCStreamDestination::SetLength(int64_t aLength) {
326 MOZ_ASSERT(mReader);
327 MOZ_ASSERT(!mLengthSet);
329 #ifdef DEBUG
330 mLengthSet = true;
331 #endif
333 mLength = aLength;
336 already_AddRefed<nsIInputStream> IPCStreamDestination::TakeReader() {
337 MOZ_ASSERT(mReader);
338 MOZ_ASSERT(!mDelayedStartInputStream);
340 nsCOMPtr<nsIAsyncInputStream> reader{mReader.forget()};
341 if (mDelayedStart) {
342 mDelayedStartInputStream =
343 new DelayedStartInputStream(this, std::move(reader));
344 reader = mDelayedStartInputStream;
345 MOZ_ASSERT(reader);
348 if (mLength != -1) {
349 MOZ_ASSERT(mLengthSet);
350 nsCOMPtr<nsIInputStream> finalStream =
351 new InputStreamLengthWrapper(reader.forget(), mLength);
352 reader = do_QueryInterface(finalStream);
353 MOZ_ASSERT(reader);
356 return reader.forget();
359 bool IPCStreamDestination::IsOnOwningThread() const {
360 return mOwningThread == NS_GetCurrentThread();
363 void IPCStreamDestination::DispatchRunnable(
364 already_AddRefed<nsIRunnable>&& aRunnable) {
365 nsCOMPtr<nsIRunnable> runnable = aRunnable;
366 mOwningThread->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
369 void IPCStreamDestination::ActorDestroyed() {
370 MOZ_ASSERT(mWriter);
372 // If we were gracefully closed we should have gotten RecvClose(). In
373 // that case, the writer will already be closed and this will have no
374 // effect. This just aborts the writer in the case where the child process
375 // crashes.
376 mWriter->CloseWithStatus(NS_ERROR_ABORT);
378 if (mDelayedStartInputStream) {
379 mDelayedStartInputStream->DestinationShutdown();
380 mDelayedStartInputStream = nullptr;
384 void IPCStreamDestination::BufferReceived(const wr::ByteBuffer& aBuffer) {
385 MOZ_ASSERT(mWriter);
387 uint32_t numWritten = 0;
389 // This should only fail if we hit an OOM condition.
390 nsresult rv = mWriter->Write(reinterpret_cast<char*>(aBuffer.mData),
391 aBuffer.mLength, &numWritten);
392 if (NS_WARN_IF(NS_FAILED(rv))) {
393 RequestClose(rv);
397 void IPCStreamDestination::CloseReceived(nsresult aRv) {
398 MOZ_ASSERT(mWriter);
399 mWriter->CloseWithStatus(aRv);
400 TerminateDestination();
403 } // namespace ipc
404 } // namespace mozilla