1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "IPCStreamSource.h"
9 #include "BackgroundParent.h" // for AssertIsOnBackgroundThread
10 #include "mozilla/UniquePtr.h"
11 #include "mozilla/dom/RemoteWorkerService.h"
12 #include "mozilla/dom/WorkerCommon.h"
13 #include "mozilla/webrender/WebRenderTypes.h"
14 #include "nsIAsyncInputStream.h"
15 #include "nsICancelableRunnable.h"
16 #include "nsIRunnable.h"
17 #include "nsISerialEventTarget.h"
18 #include "nsStreamUtils.h"
19 #include "nsThreadUtils.h"
20 #include "nsIThread.h"
22 using mozilla::wr::ByteBuffer
;
27 class IPCStreamSource::Callback final
: public DiscardableRunnable
,
28 public nsIInputStreamCallback
{
30 explicit Callback(IPCStreamSource
* aSource
)
31 : DiscardableRunnable("IPCStreamSource::Callback"),
33 mOwningEventTarget(GetCurrentSerialEventTarget()) {
38 OnInputStreamReady(nsIAsyncInputStream
* aStream
) override
{
40 if (mOwningEventTarget
->IsOnCurrentThread()) {
44 // If this fails, then it means the owning thread is a Worker that has
45 // been shutdown. Its ok to lose the event in this case because the
46 // IPCStreamChild listens for this event through the WorkerRef.
48 mOwningEventTarget
->Dispatch(this, nsIThread::DISPATCH_NORMAL
);
50 NS_WARNING("Failed to dispatch stream readable event to owning thread");
58 MOZ_ASSERT(mOwningEventTarget
->IsOnCurrentThread());
60 mSource
->OnStreamReady(this);
65 // OnDiscard() gets called when the Worker thread is being shutdown. We have
66 // nothing to do here because IPCStreamChild handles this case via
70 MOZ_ASSERT(mOwningEventTarget
->IsOnCurrentThread());
77 // called on any thread
79 // ClearSource() should be called before the Callback is destroyed
83 // This is a raw pointer because the source keeps alive the callback and,
84 // before beeing destroyed, it nullifies this pointer (this happens when
85 // ActorDestroyed() is called).
86 IPCStreamSource
* mSource
;
88 nsCOMPtr
<nsISerialEventTarget
> mOwningEventTarget
;
90 NS_DECL_ISUPPORTS_INHERITED
93 NS_IMPL_ISUPPORTS_INHERITED(IPCStreamSource::Callback
, DiscardableRunnable
,
94 nsIInputStreamCallback
);
96 IPCStreamSource::IPCStreamSource(nsIAsyncInputStream
* aInputStream
)
97 : mStream(aInputStream
), mState(ePending
) {
98 MOZ_ASSERT(aInputStream
);
101 IPCStreamSource::~IPCStreamSource() {
102 NS_ASSERT_OWNINGTHREAD(IPCStreamSource
);
103 MOZ_ASSERT(mState
== eClosed
);
104 MOZ_ASSERT(!mCallback
);
105 MOZ_ASSERT(!mWorkerRef
);
108 bool IPCStreamSource::Initialize() {
109 bool nonBlocking
= false;
110 MOZ_ALWAYS_TRUE(NS_SUCCEEDED(mStream
->IsNonBlocking(&nonBlocking
)));
111 // IPCStreamChild reads in the current thread, so it is only supported on
112 // non-blocking, async channels
117 // A source can be used on any thread, but we only support IPCStream on
118 // main thread, Workers, Worker Launcher, and PBackground thread right now.
119 // This is due to the requirement that the thread be guaranteed to live long
120 // enough to receive messages. We can enforce this guarantee with a
121 // StrongWorkerRef on worker threads, but not other threads. Main-thread,
122 // PBackground, and Worker Launcher threads do not need anything special in
123 // order to be kept alive.
124 if (!NS_IsMainThread()) {
125 if (const auto workerPrivate
= dom::GetCurrentThreadWorkerPrivate()) {
126 RefPtr
<dom::StrongWorkerRef
> workerRef
=
127 dom::StrongWorkerRef::CreateForcibly(workerPrivate
,
129 if (NS_WARN_IF(!workerRef
)) {
133 mWorkerRef
= std::move(workerRef
);
135 MOZ_DIAGNOSTIC_ASSERT(
136 IsOnBackgroundThread() ||
137 dom::RemoteWorkerService::Thread()->IsOnCurrentThread());
144 void IPCStreamSource::ActorConstructed() {
145 MOZ_ASSERT(mState
== ePending
);
146 mState
= eActorConstructed
;
149 void IPCStreamSource::ActorDestroyed() {
150 NS_ASSERT_OWNINGTHREAD(IPCStreamSource
);
155 mCallback
->ClearSource();
159 mWorkerRef
= nullptr;
162 void IPCStreamSource::Start() {
163 NS_ASSERT_OWNINGTHREAD(IPCStreamSource
);
167 void IPCStreamSource::StartDestroy() {
168 NS_ASSERT_OWNINGTHREAD(IPCStreamSource
);
169 OnEnd(NS_ERROR_ABORT
);
172 void IPCStreamSource::DoRead() {
173 NS_ASSERT_OWNINGTHREAD(IPCStreamSource
);
174 MOZ_ASSERT(mState
== eActorConstructed
);
175 MOZ_ASSERT(!mCallback
);
177 // The input stream (likely a pipe) probably uses a segment size of
178 // 4kb. If there is data already buffered it would be nice to aggregate
179 // multiple segments into a single IPC call. Conversely, don't send too
180 // too large of a buffer in a single call to avoid spiking memory.
181 static const uint64_t kMaxBytesPerMessage
= 32 * 1024;
182 static_assert(kMaxBytesPerMessage
<= static_cast<uint64_t>(UINT32_MAX
),
183 "kMaxBytesPerMessage must cleanly cast to uint32_t");
185 UniquePtr
<char[]> buffer(new char[kMaxBytesPerMessage
]);
188 // It should not be possible to transition to closed state without
189 // this loop terminating via a return.
190 MOZ_ASSERT(mState
== eActorConstructed
);
192 // See if the stream is closed by checking the return of Available.
194 nsresult rv
= mStream
->Available(&dummy
);
200 uint32_t bytesRead
= 0;
201 rv
= mStream
->Read(buffer
.get(), kMaxBytesPerMessage
, &bytesRead
);
203 if (rv
== NS_BASE_STREAM_WOULD_BLOCK
) {
204 MOZ_ASSERT(bytesRead
== 0);
210 MOZ_ASSERT(bytesRead
== 0);
215 // Zero-byte read indicates end-of-stream.
216 if (bytesRead
== 0) {
217 OnEnd(NS_BASE_STREAM_CLOSED
);
221 // We read some data from the stream, send it across.
222 SendData(ByteBuffer(bytesRead
, reinterpret_cast<uint8_t*>(buffer
.get())));
226 void IPCStreamSource::Wait() {
227 NS_ASSERT_OWNINGTHREAD(IPCStreamSource
);
228 MOZ_ASSERT(mState
== eActorConstructed
);
229 MOZ_ASSERT(!mCallback
);
231 // Set mCallback immediately instead of waiting for success. Its possible
232 // AsyncWait() will callback synchronously.
233 mCallback
= new Callback(this);
234 nsresult rv
= mStream
->AsyncWait(mCallback
, 0, 0, nullptr);
241 void IPCStreamSource::OnStreamReady(Callback
* aCallback
) {
242 NS_ASSERT_OWNINGTHREAD(IPCStreamSource
);
243 MOZ_ASSERT(mCallback
);
244 MOZ_ASSERT(aCallback
== mCallback
);
245 mCallback
->ClearSource();
248 // Possibly closed if this callback is (indirectly) called by
249 // IPCStreamSourceParent::RecvRequestClose().
250 if (mState
== eClosed
) {
257 void IPCStreamSource::OnEnd(nsresult aRv
) {
258 NS_ASSERT_OWNINGTHREAD(IPCStreamSource
);
259 MOZ_ASSERT(aRv
!= NS_BASE_STREAM_WOULD_BLOCK
);
261 if (mState
== eClosed
) {
267 mStream
->CloseWithStatus(aRv
);
269 if (aRv
== NS_BASE_STREAM_CLOSED
) {
273 // This will trigger an ActorDestroy() from the other side
278 } // namespace mozilla