Bug 1758813 [wpt PR 33142] - Implement RP sign out, a=testonly
[gecko.git] / ipc / glue / IPCStreamSource.cpp
blobeb967311634818eb9dbed502881b021f1cbb8e70
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "IPCStreamSource.h"
9 #include "BackgroundParent.h" // for AssertIsOnBackgroundThread
10 #include "mozilla/UniquePtr.h"
11 #include "mozilla/dom/RemoteWorkerService.h"
12 #include "mozilla/dom/WorkerCommon.h"
13 #include "mozilla/webrender/WebRenderTypes.h"
14 #include "nsIAsyncInputStream.h"
15 #include "nsICancelableRunnable.h"
16 #include "nsIRunnable.h"
17 #include "nsISerialEventTarget.h"
18 #include "nsStreamUtils.h"
19 #include "nsThreadUtils.h"
20 #include "nsIThread.h"
22 using mozilla::wr::ByteBuffer;
24 namespace mozilla {
25 namespace ipc {
27 class IPCStreamSource::Callback final : public DiscardableRunnable,
28 public nsIInputStreamCallback {
29 public:
30 explicit Callback(IPCStreamSource* aSource)
31 : DiscardableRunnable("IPCStreamSource::Callback"),
32 mSource(aSource),
33 mOwningEventTarget(GetCurrentSerialEventTarget()) {
34 MOZ_ASSERT(mSource);
37 NS_IMETHOD
38 OnInputStreamReady(nsIAsyncInputStream* aStream) override {
39 // any thread
40 if (mOwningEventTarget->IsOnCurrentThread()) {
41 return Run();
44 // If this fails, then it means the owning thread is a Worker that has
45 // been shutdown. Its ok to lose the event in this case because the
46 // IPCStreamChild listens for this event through the WorkerRef.
47 nsresult rv =
48 mOwningEventTarget->Dispatch(this, nsIThread::DISPATCH_NORMAL);
49 if (NS_FAILED(rv)) {
50 NS_WARNING("Failed to dispatch stream readable event to owning thread");
53 return NS_OK;
56 NS_IMETHOD
57 Run() override {
58 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
59 if (mSource) {
60 mSource->OnStreamReady(this);
62 return NS_OK;
65 // OnDiscard() gets called when the Worker thread is being shutdown. We have
66 // nothing to do here because IPCStreamChild handles this case via
67 // the WorkerRef.
69 void ClearSource() {
70 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
71 MOZ_ASSERT(mSource);
72 mSource = nullptr;
75 private:
76 ~Callback() {
77 // called on any thread
79 // ClearSource() should be called before the Callback is destroyed
80 MOZ_ASSERT(!mSource);
83 // This is a raw pointer because the source keeps alive the callback and,
84 // before beeing destroyed, it nullifies this pointer (this happens when
85 // ActorDestroyed() is called).
86 IPCStreamSource* mSource;
88 nsCOMPtr<nsISerialEventTarget> mOwningEventTarget;
90 NS_DECL_ISUPPORTS_INHERITED
93 NS_IMPL_ISUPPORTS_INHERITED(IPCStreamSource::Callback, DiscardableRunnable,
94 nsIInputStreamCallback);
96 IPCStreamSource::IPCStreamSource(nsIAsyncInputStream* aInputStream)
97 : mStream(aInputStream), mState(ePending) {
98 MOZ_ASSERT(aInputStream);
101 IPCStreamSource::~IPCStreamSource() {
102 NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
103 MOZ_ASSERT(mState == eClosed);
104 MOZ_ASSERT(!mCallback);
105 MOZ_ASSERT(!mWorkerRef);
108 bool IPCStreamSource::Initialize() {
109 bool nonBlocking = false;
110 MOZ_ALWAYS_TRUE(NS_SUCCEEDED(mStream->IsNonBlocking(&nonBlocking)));
111 // IPCStreamChild reads in the current thread, so it is only supported on
112 // non-blocking, async channels
113 if (!nonBlocking) {
114 return false;
117 // A source can be used on any thread, but we only support IPCStream on
118 // main thread, Workers, Worker Launcher, and PBackground thread right now.
119 // This is due to the requirement that the thread be guaranteed to live long
120 // enough to receive messages. We can enforce this guarantee with a
121 // StrongWorkerRef on worker threads, but not other threads. Main-thread,
122 // PBackground, and Worker Launcher threads do not need anything special in
123 // order to be kept alive.
124 if (!NS_IsMainThread()) {
125 if (const auto workerPrivate = dom::GetCurrentThreadWorkerPrivate()) {
126 RefPtr<dom::StrongWorkerRef> workerRef =
127 dom::StrongWorkerRef::CreateForcibly(workerPrivate,
128 "IPCStreamSource");
129 if (NS_WARN_IF(!workerRef)) {
130 return false;
133 mWorkerRef = std::move(workerRef);
134 } else {
135 MOZ_DIAGNOSTIC_ASSERT(
136 IsOnBackgroundThread() ||
137 dom::RemoteWorkerService::Thread()->IsOnCurrentThread());
141 return true;
144 void IPCStreamSource::ActorConstructed() {
145 MOZ_ASSERT(mState == ePending);
146 mState = eActorConstructed;
149 void IPCStreamSource::ActorDestroyed() {
150 NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
152 mState = eClosed;
154 if (mCallback) {
155 mCallback->ClearSource();
156 mCallback = nullptr;
159 mWorkerRef = nullptr;
162 void IPCStreamSource::Start() {
163 NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
164 DoRead();
167 void IPCStreamSource::StartDestroy() {
168 NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
169 OnEnd(NS_ERROR_ABORT);
172 void IPCStreamSource::DoRead() {
173 NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
174 MOZ_ASSERT(mState == eActorConstructed);
175 MOZ_ASSERT(!mCallback);
177 // The input stream (likely a pipe) probably uses a segment size of
178 // 4kb. If there is data already buffered it would be nice to aggregate
179 // multiple segments into a single IPC call. Conversely, don't send too
180 // too large of a buffer in a single call to avoid spiking memory.
181 static const uint64_t kMaxBytesPerMessage = 32 * 1024;
182 static_assert(kMaxBytesPerMessage <= static_cast<uint64_t>(UINT32_MAX),
183 "kMaxBytesPerMessage must cleanly cast to uint32_t");
185 UniquePtr<char[]> buffer(new char[kMaxBytesPerMessage]);
187 while (true) {
188 // It should not be possible to transition to closed state without
189 // this loop terminating via a return.
190 MOZ_ASSERT(mState == eActorConstructed);
192 // See if the stream is closed by checking the return of Available.
193 uint64_t dummy;
194 nsresult rv = mStream->Available(&dummy);
195 if (NS_FAILED(rv)) {
196 OnEnd(rv);
197 return;
200 uint32_t bytesRead = 0;
201 rv = mStream->Read(buffer.get(), kMaxBytesPerMessage, &bytesRead);
203 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
204 MOZ_ASSERT(bytesRead == 0);
205 Wait();
206 return;
209 if (NS_FAILED(rv)) {
210 MOZ_ASSERT(bytesRead == 0);
211 OnEnd(rv);
212 return;
215 // Zero-byte read indicates end-of-stream.
216 if (bytesRead == 0) {
217 OnEnd(NS_BASE_STREAM_CLOSED);
218 return;
221 // We read some data from the stream, send it across.
222 SendData(ByteBuffer(bytesRead, reinterpret_cast<uint8_t*>(buffer.get())));
226 void IPCStreamSource::Wait() {
227 NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
228 MOZ_ASSERT(mState == eActorConstructed);
229 MOZ_ASSERT(!mCallback);
231 // Set mCallback immediately instead of waiting for success. Its possible
232 // AsyncWait() will callback synchronously.
233 mCallback = new Callback(this);
234 nsresult rv = mStream->AsyncWait(mCallback, 0, 0, nullptr);
235 if (NS_FAILED(rv)) {
236 OnEnd(rv);
237 return;
241 void IPCStreamSource::OnStreamReady(Callback* aCallback) {
242 NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
243 MOZ_ASSERT(mCallback);
244 MOZ_ASSERT(aCallback == mCallback);
245 mCallback->ClearSource();
246 mCallback = nullptr;
248 // Possibly closed if this callback is (indirectly) called by
249 // IPCStreamSourceParent::RecvRequestClose().
250 if (mState == eClosed) {
251 return;
254 DoRead();
257 void IPCStreamSource::OnEnd(nsresult aRv) {
258 NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
259 MOZ_ASSERT(aRv != NS_BASE_STREAM_WOULD_BLOCK);
261 if (mState == eClosed) {
262 return;
265 mState = eClosed;
267 mStream->CloseWithStatus(aRv);
269 if (aRv == NS_BASE_STREAM_CLOSED) {
270 aRv = NS_OK;
273 // This will trigger an ActorDestroy() from the other side
274 Close(aRv);
277 } // namespace ipc
278 } // namespace mozilla