Bug 1858509 add thread-safety annotations around MediaSourceDemuxer::mMonitor r=alwu
[gecko.git] / ipc / glue / MessageChannel.cpp
bloba7fc1357489588c25bdbc5beb06103da5b5e524c
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 * vim: sw=2 ts=4 et :
3 */
4 /* This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
8 #include "mozilla/ipc/MessageChannel.h"
10 #include <math.h>
12 #include <utility>
14 #include "CrashAnnotations.h"
15 #include "mozilla/Assertions.h"
16 #include "mozilla/CycleCollectedJSContext.h"
17 #include "mozilla/DebugOnly.h"
18 #include "mozilla/Fuzzing.h"
19 #include "mozilla/IntentionalCrash.h"
20 #include "mozilla/Logging.h"
21 #include "mozilla/Monitor.h"
22 #include "mozilla/Mutex.h"
23 #include "mozilla/ProfilerMarkers.h"
24 #include "mozilla/ScopeExit.h"
25 #include "mozilla/Sprintf.h"
26 #include "mozilla/StaticMutex.h"
27 #include "mozilla/Telemetry.h"
28 #include "mozilla/TimeStamp.h"
29 #include "mozilla/UniquePtrExtensions.h"
30 #include "mozilla/dom/ScriptSettings.h"
31 #include "mozilla/ipc/NodeController.h"
32 #include "mozilla/ipc/ProcessChild.h"
33 #include "mozilla/ipc/ProtocolUtils.h"
34 #include "nsAppRunner.h"
35 #include "nsContentUtils.h"
36 #include "nsIDirectTaskDispatcher.h"
37 #include "nsTHashMap.h"
38 #include "nsDebug.h"
39 #include "nsExceptionHandler.h"
40 #include "nsIMemoryReporter.h"
41 #include "nsISupportsImpl.h"
42 #include "nsPrintfCString.h"
43 #include "nsThreadUtils.h"
45 #ifdef XP_WIN
46 # include "mozilla/gfx/Logging.h"
47 #endif
49 #ifdef FUZZING_SNAPSHOT
50 # include "mozilla/fuzzing/IPCFuzzController.h"
51 #endif
53 // Undo the damage done by mozzconf.h
54 #undef compress
56 static mozilla::LazyLogModule sLogModule("ipc");
57 #define IPC_LOG(...) MOZ_LOG(sLogModule, LogLevel::Debug, (__VA_ARGS__))
60 * IPC design:
62 * There are two kinds of messages: async and sync. Sync messages are blocking.
64 * Terminology: To dispatch a message Foo is to run the RecvFoo code for
65 * it. This is also called "handling" the message.
67 * Sync and async messages can sometimes "nest" inside other sync messages
68 * (i.e., while waiting for the sync reply, we can dispatch the inner
69 * message). The three possible nesting levels are NOT_NESTED,
70 * NESTED_INSIDE_SYNC, and NESTED_INSIDE_CPOW. The intended uses are:
71 * NOT_NESTED - most messages.
72 * NESTED_INSIDE_SYNC - CPOW-related messages, which are always sync
73 * and can go in either direction.
74 * NESTED_INSIDE_CPOW - messages where we don't want to dispatch
75 * incoming CPOWs while waiting for the response.
76 * These nesting levels are ordered: NOT_NESTED, NESTED_INSIDE_SYNC,
77 * NESTED_INSIDE_CPOW. Async messages cannot be NESTED_INSIDE_SYNC but they can
78 * be NESTED_INSIDE_CPOW.
80 * To avoid jank, the parent process is not allowed to send NOT_NESTED sync
81 * messages. When a process is waiting for a response to a sync message M0, it
82 * will dispatch an incoming message M if:
83 * 1. M has a higher nesting level than M0, or
84 * 2. if M has the same nesting level as M0 and we're in the child, or
85 * 3. if M has the same nesting level as M0 and it was sent by the other side
86 * while dispatching M0.
87 * The idea is that messages with higher nesting should take precendence. The
88 * purpose of rule 2 is to handle a race where both processes send to each other
89 * simultaneously. In this case, we resolve the race in favor of the parent (so
90 * the child dispatches first).
92 * Messages satisfy the following properties:
93 * A. When waiting for a response to a sync message, we won't dispatch any
94 * messages of a lower nesting level.
95 * B. Messages of the same nesting level will be dispatched roughly in the
96 * order they were sent. The exception is when the parent and child send
97 * sync messages to each other simulataneously. In this case, the parent's
98 * message is dispatched first. While it is dispatched, the child may send
99 * further nested messages, and these messages may be dispatched before the
100 * child's original message. We can consider ordering to be preserved here
101 * because we pretend that the child's original message wasn't sent until
102 * after the parent's message is finished being dispatched.
104 * When waiting for a sync message reply, we dispatch an async message only if
105 * it is NESTED_INSIDE_CPOW. Normally NESTED_INSIDE_CPOW async
106 * messages are sent only from the child. However, the parent can send
107 * NESTED_INSIDE_CPOW async messages when it is creating a bridged protocol.
110 using namespace mozilla;
111 using namespace mozilla::ipc;
113 using mozilla::MonitorAutoLock;
114 using mozilla::MonitorAutoUnlock;
115 using mozilla::dom::AutoNoJSAPI;
117 #define IPC_ASSERT(_cond, ...) \
118 do { \
119 AssertWorkerThread(); \
120 mMonitor->AssertCurrentThreadOwns(); \
121 if (!(_cond)) DebugAbort(__FILE__, __LINE__, #_cond, ##__VA_ARGS__); \
122 } while (0)
124 static MessageChannel* gParentProcessBlocker = nullptr;
126 namespace mozilla {
127 namespace ipc {
129 static const uint32_t kMinTelemetryMessageSize = 4096;
131 // Note: we round the time we spend waiting for a response to the nearest
132 // millisecond. So a min value of 1 ms actually captures from 500us and above.
133 // This is used for both the sending and receiving side telemetry for sync IPC,
134 // (IPC_SYNC_MAIN_LATENCY_MS and IPC_SYNC_RECEIVE_MS).
135 static const uint32_t kMinTelemetrySyncIPCLatencyMs = 1;
137 // static
138 bool MessageChannel::sIsPumpingMessages = false;
140 class AutoEnterTransaction {
141 public:
142 explicit AutoEnterTransaction(MessageChannel* aChan, int32_t aMsgSeqno,
143 int32_t aTransactionID, int aNestedLevel)
144 MOZ_REQUIRES(*aChan->mMonitor)
145 : mChan(aChan),
146 mActive(true),
147 mOutgoing(true),
148 mNestedLevel(aNestedLevel),
149 mSeqno(aMsgSeqno),
150 mTransaction(aTransactionID),
151 mNext(mChan->mTransactionStack) {
152 mChan->mMonitor->AssertCurrentThreadOwns();
153 mChan->mTransactionStack = this;
156 explicit AutoEnterTransaction(MessageChannel* aChan,
157 const IPC::Message& aMessage)
158 MOZ_REQUIRES(*aChan->mMonitor)
159 : mChan(aChan),
160 mActive(true),
161 mOutgoing(false),
162 mNestedLevel(aMessage.nested_level()),
163 mSeqno(aMessage.seqno()),
164 mTransaction(aMessage.transaction_id()),
165 mNext(mChan->mTransactionStack) {
166 mChan->mMonitor->AssertCurrentThreadOwns();
168 if (!aMessage.is_sync()) {
169 mActive = false;
170 return;
173 mChan->mTransactionStack = this;
176 ~AutoEnterTransaction() {
177 mChan->mMonitor->AssertCurrentThreadOwns();
178 if (mActive) {
179 mChan->mTransactionStack = mNext;
183 void Cancel() {
184 mChan->mMonitor->AssertCurrentThreadOwns();
185 AutoEnterTransaction* cur = mChan->mTransactionStack;
186 MOZ_RELEASE_ASSERT(cur == this);
187 while (cur && cur->mNestedLevel != IPC::Message::NOT_NESTED) {
188 // Note that, in the following situation, we will cancel multiple
189 // transactions:
190 // 1. Parent sends NESTED_INSIDE_SYNC message P1 to child.
191 // 2. Child sends NESTED_INSIDE_SYNC message C1 to child.
192 // 3. Child dispatches P1, parent blocks.
193 // 4. Child cancels.
194 // In this case, both P1 and C1 are cancelled. The parent will
195 // remove C1 from its queue when it gets the cancellation message.
196 MOZ_RELEASE_ASSERT(cur->mActive);
197 cur->mActive = false;
198 cur = cur->mNext;
201 mChan->mTransactionStack = cur;
203 MOZ_RELEASE_ASSERT(IsComplete());
206 bool AwaitingSyncReply() const {
207 MOZ_RELEASE_ASSERT(mActive);
208 if (mOutgoing) {
209 return true;
211 return mNext ? mNext->AwaitingSyncReply() : false;
214 int AwaitingSyncReplyNestedLevel() const {
215 MOZ_RELEASE_ASSERT(mActive);
216 if (mOutgoing) {
217 return mNestedLevel;
219 return mNext ? mNext->AwaitingSyncReplyNestedLevel() : 0;
222 bool DispatchingSyncMessage() const {
223 MOZ_RELEASE_ASSERT(mActive);
224 if (!mOutgoing) {
225 return true;
227 return mNext ? mNext->DispatchingSyncMessage() : false;
230 int DispatchingSyncMessageNestedLevel() const {
231 MOZ_RELEASE_ASSERT(mActive);
232 if (!mOutgoing) {
233 return mNestedLevel;
235 return mNext ? mNext->DispatchingSyncMessageNestedLevel() : 0;
238 int NestedLevel() const {
239 MOZ_RELEASE_ASSERT(mActive);
240 return mNestedLevel;
243 int32_t SequenceNumber() const {
244 MOZ_RELEASE_ASSERT(mActive);
245 return mSeqno;
248 int32_t TransactionID() const {
249 MOZ_RELEASE_ASSERT(mActive);
250 return mTransaction;
253 void ReceivedReply(UniquePtr<IPC::Message> aMessage) {
254 MOZ_RELEASE_ASSERT(aMessage->seqno() == mSeqno);
255 MOZ_RELEASE_ASSERT(aMessage->transaction_id() == mTransaction);
256 MOZ_RELEASE_ASSERT(!mReply);
257 IPC_LOG("Reply received on worker thread: seqno=%d", mSeqno);
258 mReply = std::move(aMessage);
259 MOZ_RELEASE_ASSERT(IsComplete());
262 void HandleReply(UniquePtr<IPC::Message> aMessage) {
263 mChan->mMonitor->AssertCurrentThreadOwns();
264 AutoEnterTransaction* cur = mChan->mTransactionStack;
265 MOZ_RELEASE_ASSERT(cur == this);
266 while (cur) {
267 MOZ_RELEASE_ASSERT(cur->mActive);
268 if (aMessage->seqno() == cur->mSeqno) {
269 cur->ReceivedReply(std::move(aMessage));
270 break;
272 cur = cur->mNext;
273 MOZ_RELEASE_ASSERT(cur);
277 bool IsComplete() { return !mActive || mReply; }
279 bool IsOutgoing() { return mOutgoing; }
281 bool IsCanceled() { return !mActive; }
283 bool IsBottom() const { return !mNext; }
285 bool IsError() {
286 MOZ_RELEASE_ASSERT(mReply);
287 return mReply->is_reply_error();
290 UniquePtr<IPC::Message> GetReply() { return std::move(mReply); }
292 private:
293 MessageChannel* mChan;
295 // Active is true if this transaction is on the mChan->mTransactionStack
296 // stack. Generally we're not on the stack if the transaction was canceled
297 // or if it was for a message that doesn't require transactions (an async
298 // message).
299 bool mActive;
301 // Is this stack frame for an outgoing message?
302 bool mOutgoing;
304 // Properties of the message being sent/received.
305 int mNestedLevel;
306 int32_t mSeqno;
307 int32_t mTransaction;
309 // Next item in mChan->mTransactionStack.
310 AutoEnterTransaction* mNext;
312 // Pointer the a reply received for this message, if one was received.
313 UniquePtr<IPC::Message> mReply;
316 class PendingResponseReporter final : public nsIMemoryReporter {
317 ~PendingResponseReporter() = default;
319 public:
320 NS_DECL_THREADSAFE_ISUPPORTS
322 NS_IMETHOD
323 CollectReports(nsIHandleReportCallback* aHandleReport, nsISupports* aData,
324 bool aAnonymize) override {
325 MOZ_COLLECT_REPORT(
326 "unresolved-ipc-responses", KIND_OTHER, UNITS_COUNT,
327 MessageChannel::gUnresolvedResponses,
328 "Outstanding IPC async message responses that are still not resolved.");
329 return NS_OK;
333 NS_IMPL_ISUPPORTS(PendingResponseReporter, nsIMemoryReporter)
335 class ChannelCountReporter final : public nsIMemoryReporter {
336 ~ChannelCountReporter() = default;
338 struct ChannelCounts {
339 size_t mNow;
340 size_t mMax;
342 ChannelCounts() : mNow(0), mMax(0) {}
344 void Inc() {
345 ++mNow;
346 if (mMax < mNow) {
347 mMax = mNow;
351 void Dec() {
352 MOZ_ASSERT(mNow > 0);
353 --mNow;
357 using CountTable = nsTHashMap<nsDepCharHashKey, ChannelCounts>;
359 static StaticMutex sChannelCountMutex;
360 static CountTable* sChannelCounts MOZ_GUARDED_BY(sChannelCountMutex);
362 public:
363 NS_DECL_THREADSAFE_ISUPPORTS
365 NS_IMETHOD
366 CollectReports(nsIHandleReportCallback* aHandleReport, nsISupports* aData,
367 bool aAnonymize) override {
368 AutoTArray<std::pair<const char*, ChannelCounts>, 16> counts;
370 StaticMutexAutoLock countLock(sChannelCountMutex);
371 if (!sChannelCounts) {
372 return NS_OK;
374 counts.SetCapacity(sChannelCounts->Count());
375 for (const auto& entry : *sChannelCounts) {
376 counts.AppendElement(std::pair{entry.GetKey(), entry.GetData()});
380 for (const auto& entry : counts) {
381 nsPrintfCString pathNow("ipc-channels/%s", entry.first);
382 nsPrintfCString pathMax("ipc-channels-peak/%s", entry.first);
383 nsPrintfCString descNow(
384 "Number of IPC channels for"
385 " top-level actor type %s",
386 entry.first);
387 nsPrintfCString descMax(
388 "Peak number of IPC channels for"
389 " top-level actor type %s",
390 entry.first);
392 aHandleReport->Callback(""_ns, pathNow, KIND_OTHER, UNITS_COUNT,
393 entry.second.mNow, descNow, aData);
394 aHandleReport->Callback(""_ns, pathMax, KIND_OTHER, UNITS_COUNT,
395 entry.second.mMax, descMax, aData);
397 return NS_OK;
400 static void Increment(const char* aName) {
401 StaticMutexAutoLock countLock(sChannelCountMutex);
402 if (!sChannelCounts) {
403 sChannelCounts = new CountTable;
405 sChannelCounts->LookupOrInsert(aName).Inc();
408 static void Decrement(const char* aName) {
409 StaticMutexAutoLock countLock(sChannelCountMutex);
410 MOZ_ASSERT(sChannelCounts);
411 sChannelCounts->LookupOrInsert(aName).Dec();
415 StaticMutex ChannelCountReporter::sChannelCountMutex;
416 ChannelCountReporter::CountTable* ChannelCountReporter::sChannelCounts;
418 NS_IMPL_ISUPPORTS(ChannelCountReporter, nsIMemoryReporter)
420 // In child processes, the first MessageChannel is created before
421 // XPCOM is initialized enough to construct the memory reporter
422 // manager. This retries every time a MessageChannel is constructed,
423 // which is good enough in practice.
424 template <class Reporter>
425 static void TryRegisterStrongMemoryReporter() {
426 static Atomic<bool> registered;
427 if (registered.compareExchange(false, true)) {
428 RefPtr<Reporter> reporter = new Reporter();
429 if (NS_FAILED(RegisterStrongMemoryReporter(reporter))) {
430 registered = false;
435 Atomic<size_t> MessageChannel::gUnresolvedResponses;
437 MessageChannel::MessageChannel(const char* aName, IToplevelProtocol* aListener)
438 : mName(aName), mListener(aListener), mMonitor(new RefCountedMonitor()) {
439 MOZ_COUNT_CTOR(ipc::MessageChannel);
441 #ifdef XP_WIN
442 mEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);
443 MOZ_RELEASE_ASSERT(mEvent, "CreateEvent failed! Nothing is going to work!");
444 #endif
446 TryRegisterStrongMemoryReporter<PendingResponseReporter>();
447 TryRegisterStrongMemoryReporter<ChannelCountReporter>();
450 MessageChannel::~MessageChannel() {
451 MOZ_COUNT_DTOR(ipc::MessageChannel);
452 MonitorAutoLock lock(*mMonitor);
453 MOZ_RELEASE_ASSERT(!mOnCxxStack,
454 "MessageChannel destroyed while code on CxxStack");
455 #ifdef XP_WIN
456 if (mEvent) {
457 BOOL ok = CloseHandle(mEvent);
458 mEvent = nullptr;
460 if (!ok) {
461 gfxDevCrash(mozilla::gfx::LogReason::MessageChannelCloseFailure)
462 << "MessageChannel failed to close. GetLastError: " << GetLastError();
464 MOZ_RELEASE_ASSERT(ok);
465 } else {
466 gfxDevCrash(mozilla::gfx::LogReason::MessageChannelCloseFailure)
467 << "MessageChannel destructor ran without an mEvent Handle";
469 #endif
471 // Make sure that the MessageChannel was closed (and therefore cleared) before
472 // it was destroyed. We can't properly close the channel at this point, as it
473 // would be unsafe to invoke our listener's callbacks, and we may be being
474 // destroyed on a thread other than `mWorkerThread`.
475 if (!IsClosedLocked()) {
476 CrashReporter::AnnotateCrashReport(
477 CrashReporter::Annotation::IPCFatalErrorProtocol,
478 nsDependentCString(mName));
479 switch (mChannelState) {
480 case ChannelConnected:
481 MOZ_CRASH(
482 "MessageChannel destroyed without being closed "
483 "(mChannelState == ChannelConnected).");
484 break;
485 case ChannelClosing:
486 MOZ_CRASH(
487 "MessageChannel destroyed without being closed "
488 "(mChannelState == ChannelClosing).");
489 break;
490 case ChannelError:
491 MOZ_CRASH(
492 "MessageChannel destroyed without being closed "
493 "(mChannelState == ChannelError).");
494 break;
495 default:
496 MOZ_CRASH("MessageChannel destroyed without being closed.");
500 // Double-check other properties for thoroughness.
501 MOZ_RELEASE_ASSERT(!mLink);
502 MOZ_RELEASE_ASSERT(mPendingResponses.empty());
503 MOZ_RELEASE_ASSERT(!mChannelErrorTask);
504 MOZ_RELEASE_ASSERT(mPending.isEmpty());
505 MOZ_RELEASE_ASSERT(!mShutdownTask);
508 #ifdef DEBUG
509 void MessageChannel::AssertMaybeDeferredCountCorrect() {
510 mMonitor->AssertCurrentThreadOwns();
512 size_t count = 0;
513 for (MessageTask* task : mPending) {
514 task->AssertMonitorHeld(*mMonitor);
515 if (!IsAlwaysDeferred(*task->Msg())) {
516 count++;
520 MOZ_ASSERT(count == mMaybeDeferredPendingCount);
522 #endif
524 // This function returns the current transaction ID. Since the notion of a
525 // "current transaction" can be hard to define when messages race with each
526 // other and one gets canceled and the other doesn't, we require that this
527 // function is only called when the current transaction is known to be for a
528 // NESTED_INSIDE_SYNC message. In that case, we know for sure what the caller is
529 // looking for.
530 int32_t MessageChannel::CurrentNestedInsideSyncTransaction() const {
531 mMonitor->AssertCurrentThreadOwns();
532 if (!mTransactionStack) {
533 return 0;
535 MOZ_RELEASE_ASSERT(mTransactionStack->NestedLevel() ==
536 IPC::Message::NESTED_INSIDE_SYNC);
537 return mTransactionStack->TransactionID();
540 bool MessageChannel::AwaitingSyncReply() const {
541 mMonitor->AssertCurrentThreadOwns();
542 return mTransactionStack ? mTransactionStack->AwaitingSyncReply() : false;
545 int MessageChannel::AwaitingSyncReplyNestedLevel() const {
546 mMonitor->AssertCurrentThreadOwns();
547 return mTransactionStack ? mTransactionStack->AwaitingSyncReplyNestedLevel()
548 : 0;
551 bool MessageChannel::DispatchingSyncMessage() const {
552 mMonitor->AssertCurrentThreadOwns();
553 return mTransactionStack ? mTransactionStack->DispatchingSyncMessage()
554 : false;
557 int MessageChannel::DispatchingSyncMessageNestedLevel() const {
558 mMonitor->AssertCurrentThreadOwns();
559 return mTransactionStack
560 ? mTransactionStack->DispatchingSyncMessageNestedLevel()
561 : 0;
564 static void PrintErrorMessage(Side side, const char* channelName,
565 const char* msg) {
566 printf_stderr("\n###!!! [%s][%s] Error: %s\n\n", StringFromIPCSide(side),
567 channelName, msg);
570 bool MessageChannel::Connected() const {
571 mMonitor->AssertCurrentThreadOwns();
572 return ChannelConnected == mChannelState;
575 bool MessageChannel::ConnectedOrClosing() const {
576 mMonitor->AssertCurrentThreadOwns();
577 return ChannelConnected == mChannelState || ChannelClosing == mChannelState;
580 bool MessageChannel::CanSend() const {
581 if (!mMonitor) {
582 return false;
584 MonitorAutoLock lock(*mMonitor);
585 return Connected();
588 void MessageChannel::Clear() {
589 AssertWorkerThread();
590 mMonitor->AssertCurrentThreadOwns();
591 MOZ_DIAGNOSTIC_ASSERT(IsClosedLocked(), "MessageChannel cleared too early?");
592 MOZ_ASSERT(ChannelClosed == mChannelState || ChannelError == mChannelState);
594 // Don't clear mWorkerThread; we use it in AssertWorkerThread().
596 // Also don't clear mListener. If we clear it, then sending a message
597 // through this channel after it's Clear()'ed can cause this process to
598 // crash.
600 if (mShutdownTask) {
601 mShutdownTask->Clear();
602 mWorkerThread->UnregisterShutdownTask(mShutdownTask);
604 mShutdownTask = nullptr;
606 if (NS_IsMainThread() && gParentProcessBlocker == this) {
607 gParentProcessBlocker = nullptr;
610 gUnresolvedResponses -= mPendingResponses.size();
612 CallbackMap map = std::move(mPendingResponses);
613 MonitorAutoUnlock unlock(*mMonitor);
614 for (auto& pair : map) {
615 pair.second->Reject(ResponseRejectReason::ChannelClosed);
618 mPendingResponses.clear();
620 SetIsCrossProcess(false);
622 mLink = nullptr;
624 if (mChannelErrorTask) {
625 mChannelErrorTask->Cancel();
626 mChannelErrorTask = nullptr;
629 if (mFlushLazySendTask) {
630 mFlushLazySendTask->Cancel();
631 mFlushLazySendTask = nullptr;
634 // Free up any memory used by pending messages.
635 mPending.clear();
637 mMaybeDeferredPendingCount = 0;
640 bool MessageChannel::Open(ScopedPort aPort, Side aSide,
641 const nsID& aMessageChannelId,
642 nsISerialEventTarget* aEventTarget) {
643 nsCOMPtr<nsISerialEventTarget> eventTarget =
644 aEventTarget ? aEventTarget : GetCurrentSerialEventTarget();
645 MOZ_RELEASE_ASSERT(eventTarget,
646 "Must open MessageChannel on a nsISerialEventTarget");
647 MOZ_RELEASE_ASSERT(eventTarget->IsOnCurrentThread(),
648 "Must open MessageChannel from worker thread");
650 auto shutdownTask = MakeRefPtr<WorkerTargetShutdownTask>(eventTarget, this);
651 nsresult rv = eventTarget->RegisterShutdownTask(shutdownTask);
652 MOZ_ASSERT(rv != NS_ERROR_NOT_IMPLEMENTED,
653 "target for MessageChannel must support shutdown tasks");
654 if (rv == NS_ERROR_UNEXPECTED) {
655 // If shutdown tasks have already started running, dispatch our shutdown
656 // task manually.
657 NS_WARNING("Opening MessageChannel on EventTarget in shutdown");
658 rv = eventTarget->Dispatch(shutdownTask->AsRunnable());
660 MOZ_RELEASE_ASSERT(NS_SUCCEEDED(rv),
661 "error registering ShutdownTask for MessageChannel");
664 MonitorAutoLock lock(*mMonitor);
665 MOZ_RELEASE_ASSERT(!mLink, "Open() called > once");
666 MOZ_RELEASE_ASSERT(ChannelClosed == mChannelState, "Not currently closed");
667 MOZ_ASSERT(mSide == UnknownSide);
669 mMessageChannelId = aMessageChannelId;
670 mWorkerThread = eventTarget;
671 mShutdownTask = shutdownTask;
672 mLink = MakeUnique<PortLink>(this, std::move(aPort));
673 mChannelState = ChannelConnected;
674 mSide = aSide;
677 // Notify our listener that the underlying IPC channel has been established.
678 // IProtocol will use this callback to create the ActorLifecycleProxy, and
679 // perform an `AddRef` call to keep the actor alive until the channel is
680 // disconnected.
682 // We unlock our monitor before calling `OnIPCChannelOpened` to ensure that
683 // any calls back into `MessageChannel` do not deadlock. At this point, we may
684 // be receiving messages on the IO thread, however we cannot process them on
685 // the worker thread or have notified our listener until after this function
686 // returns.
687 mListener->OnIPCChannelOpened();
688 return true;
691 static Side GetOppSide(Side aSide) {
692 switch (aSide) {
693 case ChildSide:
694 return ParentSide;
695 case ParentSide:
696 return ChildSide;
697 default:
698 return UnknownSide;
702 bool MessageChannel::Open(MessageChannel* aTargetChan,
703 nsISerialEventTarget* aEventTarget, Side aSide) {
704 // Opens a connection to another thread in the same process.
706 MOZ_ASSERT(aTargetChan, "Need a target channel");
708 nsID channelId = nsID::GenerateUUID();
710 std::pair<ScopedPort, ScopedPort> ports =
711 NodeController::GetSingleton()->CreatePortPair();
713 // NOTE: This dispatch must be sync as it captures locals by non-owning
714 // reference, however we can't use `NS_DispatchAndSpinEventLoopUntilComplete`
715 // as that will spin a nested event loop, and doesn't work with certain types
716 // of calling event targets.
717 base::WaitableEvent event(/* manual_reset */ true,
718 /* initially_signaled */ false);
719 MOZ_ALWAYS_SUCCEEDS(aEventTarget->Dispatch(NS_NewCancelableRunnableFunction(
720 "ipc::MessageChannel::OpenAsOtherThread", [&]() {
721 aTargetChan->Open(std::move(ports.second), GetOppSide(aSide), channelId,
722 aEventTarget);
723 event.Signal();
724 })));
725 bool ok = event.Wait();
726 MOZ_RELEASE_ASSERT(ok);
728 // Now that the other side has connected, open the port on our side.
729 return Open(std::move(ports.first), aSide, channelId);
732 bool MessageChannel::OpenOnSameThread(MessageChannel* aTargetChan,
733 mozilla::ipc::Side aSide) {
734 auto [porta, portb] = NodeController::GetSingleton()->CreatePortPair();
736 nsID channelId = nsID::GenerateUUID();
738 aTargetChan->mIsSameThreadChannel = true;
739 mIsSameThreadChannel = true;
741 auto* currentThread = GetCurrentSerialEventTarget();
742 return aTargetChan->Open(std::move(portb), GetOppSide(aSide), channelId,
743 currentThread) &&
744 Open(std::move(porta), aSide, channelId, currentThread);
747 bool MessageChannel::Send(UniquePtr<Message> aMsg) {
748 if (aMsg->size() >= kMinTelemetryMessageSize) {
749 Telemetry::Accumulate(Telemetry::IPC_MESSAGE_SIZE2, aMsg->size());
752 MOZ_RELEASE_ASSERT(!aMsg->is_sync());
753 MOZ_RELEASE_ASSERT(aMsg->nested_level() != IPC::Message::NESTED_INSIDE_SYNC);
755 AutoSetValue<bool> setOnCxxStack(mOnCxxStack, true);
757 AssertWorkerThread();
758 mMonitor->AssertNotCurrentThreadOwns();
759 if (MSG_ROUTING_NONE == aMsg->routing_id()) {
760 ReportMessageRouteError("MessageChannel::Send");
761 return false;
764 if (aMsg->seqno() == 0) {
765 aMsg->set_seqno(NextSeqno());
768 MonitorAutoLock lock(*mMonitor);
769 if (!Connected()) {
770 ReportConnectionError("Send", aMsg->type());
771 return false;
774 AddProfilerMarker(*aMsg, MessageDirection::eSending);
775 SendMessageToLink(std::move(aMsg));
776 return true;
779 void MessageChannel::SendMessageToLink(UniquePtr<Message> aMsg) {
780 AssertWorkerThread();
781 mMonitor->AssertCurrentThreadOwns();
783 // If the channel is not cross-process, there's no reason to be lazy, so we
784 // ignore the flag in that case.
785 if (aMsg->is_lazy_send() && mIsCrossProcess) {
786 // If this is the first lazy message in the queue and our worker thread
787 // supports direct task dispatch, dispatch a task to flush messages,
788 // ensuring we don't leave them pending forever.
789 if (!mFlushLazySendTask) {
790 if (nsCOMPtr<nsIDirectTaskDispatcher> dispatcher =
791 do_QueryInterface(mWorkerThread)) {
792 mFlushLazySendTask = new FlushLazySendMessagesRunnable(this);
793 MOZ_ALWAYS_SUCCEEDS(
794 dispatcher->DispatchDirectTask(do_AddRef(mFlushLazySendTask)));
797 if (mFlushLazySendTask) {
798 mFlushLazySendTask->PushMessage(std::move(aMsg));
799 return;
803 if (mFlushLazySendTask) {
804 FlushLazySendMessages();
806 mLink->SendMessage(std::move(aMsg));
809 void MessageChannel::FlushLazySendMessages() {
810 AssertWorkerThread();
811 mMonitor->AssertCurrentThreadOwns();
813 // Clean up any SendLazyTask which might be pending.
814 auto messages = mFlushLazySendTask->TakeMessages();
815 mFlushLazySendTask = nullptr;
817 // Send all lazy messages, then clear the queue.
818 for (auto& msg : messages) {
819 mLink->SendMessage(std::move(msg));
823 UniquePtr<MessageChannel::UntypedCallbackHolder> MessageChannel::PopCallback(
824 const Message& aMsg, int32_t aActorId) {
825 auto iter = mPendingResponses.find(aMsg.seqno());
826 if (iter != mPendingResponses.end() && iter->second->mActorId == aActorId &&
827 iter->second->mReplyMsgId == aMsg.type()) {
828 UniquePtr<MessageChannel::UntypedCallbackHolder> ret =
829 std::move(iter->second);
830 mPendingResponses.erase(iter);
831 gUnresolvedResponses--;
832 return ret;
834 return nullptr;
837 void MessageChannel::RejectPendingResponsesForActor(int32_t aActorId) {
838 auto itr = mPendingResponses.begin();
839 while (itr != mPendingResponses.end()) {
840 if (itr->second.get()->mActorId != aActorId) {
841 ++itr;
842 continue;
844 itr->second.get()->Reject(ResponseRejectReason::ActorDestroyed);
845 // Take special care of advancing the iterator since we are
846 // removing it while iterating.
847 itr = mPendingResponses.erase(itr);
848 gUnresolvedResponses--;
852 class BuildIDsMatchMessage : public IPC::Message {
853 public:
854 BuildIDsMatchMessage()
855 : IPC::Message(MSG_ROUTING_NONE, BUILD_IDS_MATCH_MESSAGE_TYPE) {}
856 void Log(const std::string& aPrefix, FILE* aOutf) const {
857 fputs("(special `Build IDs match' message)", aOutf);
861 // Send the parent a special async message to confirm when the parent and child
862 // are of the same buildID. Skips sending the message and returns false if the
863 // buildIDs don't match. This is a minor variation on
864 // MessageChannel::Send(Message* aMsg).
865 bool MessageChannel::SendBuildIDsMatchMessage(const char* aParentBuildID) {
866 MOZ_ASSERT(!XRE_IsParentProcess());
868 nsCString parentBuildID(aParentBuildID);
869 nsCString childBuildID(mozilla::PlatformBuildID());
871 if (parentBuildID != childBuildID) {
872 // The build IDs didn't match, usually because an update occurred in the
873 // background.
874 return false;
877 auto msg = MakeUnique<BuildIDsMatchMessage>();
879 MOZ_RELEASE_ASSERT(!msg->is_sync());
880 MOZ_RELEASE_ASSERT(msg->nested_level() != IPC::Message::NESTED_INSIDE_SYNC);
882 AssertWorkerThread();
883 mMonitor->AssertNotCurrentThreadOwns();
884 // Don't check for MSG_ROUTING_NONE.
886 MonitorAutoLock lock(*mMonitor);
887 if (!Connected()) {
888 ReportConnectionError("SendBuildIDsMatchMessage", msg->type());
889 return false;
892 #if defined(MOZ_DEBUG) && defined(ENABLE_TESTS)
893 // Technically, the behavior is interesting for any kind of process
894 // but when exercising tests, we want to crash only a content process and
895 // avoid making noise with other kind of processes crashing
896 if (const char* dontSend = PR_GetEnv("MOZ_BUILDID_MATCH_DONTSEND")) {
897 if (dontSend[0] == '1') {
898 // Bug 1732999: We are going to crash, so we need to advise leak check
899 // tooling to avoid intermittent missing leakcheck
900 NoteIntentionalCrash(XRE_GetProcessTypeString());
901 if (XRE_IsContentProcess()) {
902 return false;
906 #endif
908 SendMessageToLink(std::move(msg));
909 return true;
912 class CancelMessage : public IPC::Message {
913 public:
914 explicit CancelMessage(int transaction)
915 : IPC::Message(MSG_ROUTING_NONE, CANCEL_MESSAGE_TYPE) {
916 set_transaction_id(transaction);
918 static bool Read(const Message* msg) { return true; }
919 void Log(const std::string& aPrefix, FILE* aOutf) const {
920 fputs("(special `Cancel' message)", aOutf);
924 bool MessageChannel::MaybeInterceptSpecialIOMessage(const Message& aMsg) {
925 mMonitor->AssertCurrentThreadOwns();
927 if (MSG_ROUTING_NONE == aMsg.routing_id()) {
928 if (GOODBYE_MESSAGE_TYPE == aMsg.type()) {
929 // We've received a GOODBYE message, close the connection and mark
930 // ourselves as "Closing".
931 mLink->Close();
932 mChannelState = ChannelClosing;
933 if (LoggingEnabledFor(mListener->GetProtocolName(), mSide)) {
934 printf(
935 "[%s %u] NOTE: %s%s actor received `Goodbye' message. Closing "
936 "channel.\n",
937 XRE_GeckoProcessTypeToString(XRE_GetProcessType()),
938 static_cast<uint32_t>(base::GetCurrentProcId()),
939 mListener->GetProtocolName(), StringFromIPCSide(mSide));
942 // Notify the worker thread that the connection has been closed, as we
943 // will not receive an `OnChannelErrorFromLink` after calling
944 // `mLink->Close()`.
945 if (AwaitingSyncReply()) {
946 NotifyWorkerThread();
948 PostErrorNotifyTask();
949 return true;
950 } else if (CANCEL_MESSAGE_TYPE == aMsg.type()) {
951 IPC_LOG("Cancel from message");
952 CancelTransaction(aMsg.transaction_id());
953 NotifyWorkerThread();
954 return true;
955 } else if (BUILD_IDS_MATCH_MESSAGE_TYPE == aMsg.type()) {
956 IPC_LOG("Build IDs match message");
957 mBuildIDsConfirmedMatch = true;
958 return true;
959 } else if (IMPENDING_SHUTDOWN_MESSAGE_TYPE == aMsg.type()) {
960 IPC_LOG("Impending Shutdown received");
961 ProcessChild::NotifiedImpendingShutdown();
962 return true;
965 return false;
968 /* static */
969 bool MessageChannel::IsAlwaysDeferred(const Message& aMsg) {
970 // If a message is not NESTED_INSIDE_CPOW and not sync, then we always defer
971 // it.
972 return aMsg.nested_level() != IPC::Message::NESTED_INSIDE_CPOW &&
973 !aMsg.is_sync();
976 bool MessageChannel::ShouldDeferMessage(const Message& aMsg) {
977 // Never defer messages that have the highest nested level, even async
978 // ones. This is safe because only the child can send these messages, so
979 // they can never nest.
980 if (aMsg.nested_level() == IPC::Message::NESTED_INSIDE_CPOW) {
981 MOZ_ASSERT(!IsAlwaysDeferred(aMsg));
982 return false;
985 // Unless they're NESTED_INSIDE_CPOW, we always defer async messages.
986 // Note that we never send an async NESTED_INSIDE_SYNC message.
987 if (!aMsg.is_sync()) {
988 MOZ_RELEASE_ASSERT(aMsg.nested_level() == IPC::Message::NOT_NESTED);
989 MOZ_ASSERT(IsAlwaysDeferred(aMsg));
990 return true;
993 MOZ_ASSERT(!IsAlwaysDeferred(aMsg));
995 int msgNestedLevel = aMsg.nested_level();
996 int waitingNestedLevel = AwaitingSyncReplyNestedLevel();
998 // Always defer if the nested level of the incoming message is less than the
999 // nested level of the message we're awaiting.
1000 if (msgNestedLevel < waitingNestedLevel) return true;
1002 // Never defer if the message has strictly greater nested level.
1003 if (msgNestedLevel > waitingNestedLevel) return false;
1005 // When both sides send sync messages of the same nested level, we resolve the
1006 // race by dispatching in the child and deferring the incoming message in
1007 // the parent. However, the parent still needs to dispatch nested sync
1008 // messages.
1010 // Deferring in the parent only sort of breaks message ordering. When the
1011 // child's message comes in, we can pretend the child hasn't quite
1012 // finished sending it yet. Since the message is sync, we know that the
1013 // child hasn't moved on yet.
1014 return mSide == ParentSide &&
1015 aMsg.transaction_id() != CurrentNestedInsideSyncTransaction();
1018 void MessageChannel::OnMessageReceivedFromLink(UniquePtr<Message> aMsg) {
1019 mMonitor->AssertCurrentThreadOwns();
1020 MOZ_ASSERT(mChannelState == ChannelConnected);
1022 if (MaybeInterceptSpecialIOMessage(*aMsg)) {
1023 return;
1026 mListener->OnChannelReceivedMessage(*aMsg);
1028 // If we're awaiting a sync reply, we know that it needs to be immediately
1029 // handled to unblock us.
1030 if (aMsg->is_sync() && aMsg->is_reply()) {
1031 IPC_LOG("Received reply seqno=%d xid=%d", aMsg->seqno(),
1032 aMsg->transaction_id());
1034 if (aMsg->seqno() == mTimedOutMessageSeqno) {
1035 // Drop the message, but allow future sync messages to be sent.
1036 IPC_LOG("Received reply to timedout message; igoring; xid=%d",
1037 mTimedOutMessageSeqno);
1038 EndTimeout();
1039 return;
1042 MOZ_RELEASE_ASSERT(AwaitingSyncReply());
1043 MOZ_RELEASE_ASSERT(!mTimedOutMessageSeqno);
1045 mTransactionStack->HandleReply(std::move(aMsg));
1046 NotifyWorkerThread();
1047 return;
1050 // Nested messages cannot be compressed.
1051 MOZ_RELEASE_ASSERT(aMsg->compress_type() == IPC::Message::COMPRESSION_NONE ||
1052 aMsg->nested_level() == IPC::Message::NOT_NESTED);
1054 if (aMsg->compress_type() == IPC::Message::COMPRESSION_ENABLED &&
1055 !mPending.isEmpty()) {
1056 auto* last = mPending.getLast();
1057 last->AssertMonitorHeld(*mMonitor);
1058 bool compress = last->Msg()->type() == aMsg->type() &&
1059 last->Msg()->routing_id() == aMsg->routing_id();
1060 if (compress) {
1061 // This message type has compression enabled, and the back of the
1062 // queue was the same message type and routed to the same destination.
1063 // Replace it with the newer message.
1064 MOZ_RELEASE_ASSERT(last->Msg()->compress_type() ==
1065 IPC::Message::COMPRESSION_ENABLED);
1066 last->Msg() = std::move(aMsg);
1067 return;
1069 } else if (aMsg->compress_type() == IPC::Message::COMPRESSION_ALL &&
1070 !mPending.isEmpty()) {
1071 for (MessageTask* p = mPending.getLast(); p; p = p->getPrevious()) {
1072 p->AssertMonitorHeld(*mMonitor);
1073 if (p->Msg()->type() == aMsg->type() &&
1074 p->Msg()->routing_id() == aMsg->routing_id()) {
1075 // This message type has compression enabled, and the queue
1076 // holds a message with the same message type and routed to the
1077 // same destination. Erase it. Note that, since we always
1078 // compress these redundancies, There Can Be Only One.
1079 MOZ_RELEASE_ASSERT(p->Msg()->compress_type() ==
1080 IPC::Message::COMPRESSION_ALL);
1081 MOZ_RELEASE_ASSERT(IsAlwaysDeferred(*p->Msg()));
1082 p->remove();
1083 break;
1088 bool alwaysDeferred = IsAlwaysDeferred(*aMsg);
1090 bool shouldWakeUp = AwaitingSyncReply() && !ShouldDeferMessage(*aMsg);
1092 IPC_LOG("Receive from link; seqno=%d, xid=%d, shouldWakeUp=%d", aMsg->seqno(),
1093 aMsg->transaction_id(), shouldWakeUp);
1095 // There are two cases we're concerned about, relating to the state of the
1096 // worker thread:
1098 // (1) We are waiting on a sync reply - worker thread is blocked on the
1099 // IPC monitor.
1100 // - If the message is NESTED_INSIDE_SYNC, we wake up the worker thread to
1101 // deliver the message depending on ShouldDeferMessage. Otherwise, we
1102 // leave it in the mPending queue, posting a task to the worker event
1103 // loop, where it will be processed once the synchronous reply has been
1104 // received.
1106 // (2) We are not waiting on a reply.
1107 // - We post a task to the worker event loop.
1109 // Note that, we may notify the worker thread even though the monitor is not
1110 // blocked. This is okay, since we always check for pending events before
1111 // blocking again.
1113 RefPtr<MessageTask> task = new MessageTask(this, std::move(aMsg));
1114 mPending.insertBack(task);
1116 if (!alwaysDeferred) {
1117 mMaybeDeferredPendingCount++;
1120 if (shouldWakeUp) {
1121 NotifyWorkerThread();
1124 // Although we usually don't need to post a message task if
1125 // shouldWakeUp is true, it's easier to post anyway than to have to
1126 // guarantee that every Send call processes everything it's supposed to
1127 // before returning.
1128 task->AssertMonitorHeld(*mMonitor);
1129 task->Post();
1132 void MessageChannel::PeekMessages(
1133 const std::function<bool(const Message& aMsg)>& aInvoke) {
1134 // FIXME: We shouldn't be holding the lock for aInvoke!
1135 MonitorAutoLock lock(*mMonitor);
1137 for (MessageTask* it : mPending) {
1138 it->AssertMonitorHeld(*mMonitor);
1139 const Message& msg = *it->Msg();
1140 if (!aInvoke(msg)) {
1141 break;
1146 void MessageChannel::ProcessPendingRequests(
1147 ActorLifecycleProxy* aProxy, AutoEnterTransaction& aTransaction) {
1148 mMonitor->AssertCurrentThreadOwns();
1150 AssertMaybeDeferredCountCorrect();
1151 if (mMaybeDeferredPendingCount == 0) {
1152 return;
1155 IPC_LOG("ProcessPendingRequests for seqno=%d, xid=%d",
1156 aTransaction.SequenceNumber(), aTransaction.TransactionID());
1158 // Loop until there aren't any more nested messages to process.
1159 for (;;) {
1160 // If we canceled during ProcessPendingRequest, then we need to leave
1161 // immediately because the results of ShouldDeferMessage will be
1162 // operating with weird state (as if no Send is in progress). That could
1163 // cause even NOT_NESTED sync messages to be processed (but not
1164 // NOT_NESTED async messages), which would break message ordering.
1165 if (aTransaction.IsCanceled()) {
1166 return;
1169 Vector<UniquePtr<Message>> toProcess;
1171 for (MessageTask* p = mPending.getFirst(); p;) {
1172 p->AssertMonitorHeld(*mMonitor);
1173 UniquePtr<Message>& msg = p->Msg();
1175 MOZ_RELEASE_ASSERT(!aTransaction.IsCanceled(),
1176 "Calling ShouldDeferMessage when cancelled");
1177 bool defer = ShouldDeferMessage(*msg);
1179 // Only log the interesting messages.
1180 if (msg->is_sync() ||
1181 msg->nested_level() == IPC::Message::NESTED_INSIDE_CPOW) {
1182 IPC_LOG("ShouldDeferMessage(seqno=%d) = %d", msg->seqno(), defer);
1185 if (!defer) {
1186 MOZ_ASSERT(!IsAlwaysDeferred(*msg));
1188 if (!toProcess.append(std::move(msg))) MOZ_CRASH();
1190 mMaybeDeferredPendingCount--;
1192 p = p->removeAndGetNext();
1193 continue;
1195 p = p->getNext();
1198 if (toProcess.empty()) {
1199 break;
1202 // Processing these messages could result in more messages, so we
1203 // loop around to check for more afterwards.
1205 for (auto& msg : toProcess) {
1206 ProcessPendingRequest(aProxy, std::move(msg));
1210 AssertMaybeDeferredCountCorrect();
1213 bool MessageChannel::Send(UniquePtr<Message> aMsg, UniquePtr<Message>* aReply) {
1214 mozilla::TimeStamp start = TimeStamp::Now();
1215 if (aMsg->size() >= kMinTelemetryMessageSize) {
1216 Telemetry::Accumulate(Telemetry::IPC_MESSAGE_SIZE2, aMsg->size());
1219 // Sanity checks.
1220 AssertWorkerThread();
1221 mMonitor->AssertNotCurrentThreadOwns();
1222 MOZ_RELEASE_ASSERT(!mIsSameThreadChannel,
1223 "sync send over same-thread channel will deadlock!");
1225 RefPtr<ActorLifecycleProxy> proxy = Listener()->GetLifecycleProxy();
1227 #ifdef XP_WIN
1228 SyncStackFrame frame(this);
1229 NeuteredWindowRegion neuteredRgn(mFlags &
1230 REQUIRE_DEFERRED_MESSAGE_PROTECTION);
1231 #endif
1233 AutoSetValue<bool> setOnCxxStack(mOnCxxStack, true);
1235 MonitorAutoLock lock(*mMonitor);
1237 if (mTimedOutMessageSeqno) {
1238 // Don't bother sending another sync message if a previous one timed out
1239 // and we haven't received a reply for it. Once the original timed-out
1240 // message receives a reply, we'll be able to send more sync messages
1241 // again.
1242 IPC_LOG("Send() failed due to previous timeout");
1243 mLastSendError = SyncSendError::PreviousTimeout;
1244 return false;
1247 if (DispatchingSyncMessageNestedLevel() == IPC::Message::NOT_NESTED &&
1248 aMsg->nested_level() > IPC::Message::NOT_NESTED) {
1249 // Don't allow sending CPOWs while we're dispatching a sync message.
1250 IPC_LOG("Nested level forbids send");
1251 mLastSendError = SyncSendError::SendingCPOWWhileDispatchingSync;
1252 return false;
1255 if (DispatchingSyncMessageNestedLevel() == IPC::Message::NESTED_INSIDE_CPOW ||
1256 DispatchingAsyncMessageNestedLevel() ==
1257 IPC::Message::NESTED_INSIDE_CPOW) {
1258 // Generally only the parent dispatches urgent messages. And the only
1259 // sync messages it can send are NESTED_INSIDE_SYNC. Mainly we want to
1260 // ensure here that we don't return false for non-CPOW messages.
1261 MOZ_RELEASE_ASSERT(aMsg->nested_level() ==
1262 IPC::Message::NESTED_INSIDE_SYNC);
1263 IPC_LOG("Sending while dispatching urgent message");
1264 mLastSendError = SyncSendError::SendingCPOWWhileDispatchingUrgent;
1265 return false;
1268 if (aMsg->nested_level() < DispatchingSyncMessageNestedLevel() ||
1269 aMsg->nested_level() < AwaitingSyncReplyNestedLevel()) {
1270 MOZ_RELEASE_ASSERT(DispatchingSyncMessage() || DispatchingAsyncMessage());
1271 IPC_LOG("Cancel from Send");
1272 auto cancel =
1273 MakeUnique<CancelMessage>(CurrentNestedInsideSyncTransaction());
1274 CancelTransaction(CurrentNestedInsideSyncTransaction());
1275 SendMessageToLink(std::move(cancel));
1278 IPC_ASSERT(aMsg->is_sync(), "can only Send() sync messages here");
1280 IPC_ASSERT(aMsg->nested_level() >= DispatchingSyncMessageNestedLevel(),
1281 "can't send sync message of a lesser nested level than what's "
1282 "being dispatched");
1283 IPC_ASSERT(AwaitingSyncReplyNestedLevel() <= aMsg->nested_level(),
1284 "nested sync message sends must be of increasing nested level");
1285 IPC_ASSERT(
1286 DispatchingSyncMessageNestedLevel() != IPC::Message::NESTED_INSIDE_CPOW,
1287 "not allowed to send messages while dispatching urgent messages");
1289 IPC_ASSERT(
1290 DispatchingAsyncMessageNestedLevel() != IPC::Message::NESTED_INSIDE_CPOW,
1291 "not allowed to send messages while dispatching urgent messages");
1293 if (!Connected()) {
1294 ReportConnectionError("SendAndWait", aMsg->type());
1295 mLastSendError = SyncSendError::NotConnectedBeforeSend;
1296 return false;
1299 aMsg->set_seqno(NextSeqno());
1301 int32_t seqno = aMsg->seqno();
1302 int nestedLevel = aMsg->nested_level();
1303 msgid_t replyType = aMsg->type() + 1;
1305 AutoEnterTransaction* stackTop = mTransactionStack;
1307 // If the most recent message on the stack is NESTED_INSIDE_SYNC, then our
1308 // message should nest inside that and we use the same transaction
1309 // ID. Otherwise we need a new transaction ID (so we use the seqno of the
1310 // message we're sending).
1311 bool nest =
1312 stackTop && stackTop->NestedLevel() == IPC::Message::NESTED_INSIDE_SYNC;
1313 int32_t transaction = nest ? stackTop->TransactionID() : seqno;
1314 aMsg->set_transaction_id(transaction);
1316 AutoEnterTransaction transact(this, seqno, transaction, nestedLevel);
1318 IPC_LOG("Send seqno=%d, xid=%d", seqno, transaction);
1320 // aMsg will be destroyed soon, let's keep its type.
1321 const char* msgName = aMsg->name();
1322 const msgid_t msgType = aMsg->type();
1324 AddProfilerMarker(*aMsg, MessageDirection::eSending);
1325 SendMessageToLink(std::move(aMsg));
1327 while (true) {
1328 MOZ_RELEASE_ASSERT(!transact.IsCanceled());
1329 ProcessPendingRequests(proxy, transact);
1330 if (transact.IsComplete()) {
1331 break;
1333 if (!Connected()) {
1334 ReportConnectionError("Send", msgType);
1335 mLastSendError = SyncSendError::DisconnectedDuringSend;
1336 return false;
1339 MOZ_RELEASE_ASSERT(!mTimedOutMessageSeqno);
1340 MOZ_RELEASE_ASSERT(!transact.IsComplete());
1341 MOZ_RELEASE_ASSERT(mTransactionStack == &transact);
1343 bool maybeTimedOut = !WaitForSyncNotify();
1345 if (mListener->NeedArtificialSleep()) {
1346 MonitorAutoUnlock unlock(*mMonitor);
1347 mListener->ArtificialSleep();
1350 if (!Connected()) {
1351 ReportConnectionError("SendAndWait", msgType);
1352 mLastSendError = SyncSendError::DisconnectedDuringSend;
1353 return false;
1356 if (transact.IsCanceled()) {
1357 break;
1360 MOZ_RELEASE_ASSERT(mTransactionStack == &transact);
1362 // We only time out a message if it initiated a new transaction (i.e.,
1363 // if neither side has any other message Sends on the stack).
1364 bool canTimeOut = transact.IsBottom();
1365 if (maybeTimedOut && canTimeOut && !ShouldContinueFromTimeout()) {
1366 // Since ShouldContinueFromTimeout drops the lock, we need to
1367 // re-check all our conditions here. We shouldn't time out if any of
1368 // these things happen because there won't be a reply to the timed
1369 // out message in these cases.
1370 if (transact.IsComplete()) {
1371 break;
1374 IPC_LOG("Timing out Send: xid=%d", transaction);
1376 mTimedOutMessageSeqno = seqno;
1377 mTimedOutMessageNestedLevel = nestedLevel;
1378 mLastSendError = SyncSendError::TimedOut;
1379 return false;
1382 if (transact.IsCanceled()) {
1383 break;
1387 if (transact.IsCanceled()) {
1388 IPC_LOG("Other side canceled seqno=%d, xid=%d", seqno, transaction);
1389 mLastSendError = SyncSendError::CancelledAfterSend;
1390 return false;
1393 if (transact.IsError()) {
1394 IPC_LOG("Error: seqno=%d, xid=%d", seqno, transaction);
1395 mLastSendError = SyncSendError::ReplyError;
1396 return false;
1399 uint32_t latencyMs = round((TimeStamp::Now() - start).ToMilliseconds());
1400 IPC_LOG("Got reply: seqno=%d, xid=%d, msgName=%s, latency=%ums", seqno,
1401 transaction, msgName, latencyMs);
1403 UniquePtr<Message> reply = transact.GetReply();
1405 MOZ_RELEASE_ASSERT(reply);
1406 MOZ_RELEASE_ASSERT(reply->is_reply(), "expected reply");
1407 MOZ_RELEASE_ASSERT(!reply->is_reply_error());
1408 MOZ_RELEASE_ASSERT(reply->seqno() == seqno);
1409 MOZ_RELEASE_ASSERT(reply->type() == replyType, "wrong reply type");
1410 MOZ_RELEASE_ASSERT(reply->is_sync());
1412 AddProfilerMarker(*reply, MessageDirection::eReceiving);
1414 if (reply->size() >= kMinTelemetryMessageSize) {
1415 Telemetry::Accumulate(Telemetry::IPC_REPLY_SIZE,
1416 nsDependentCString(msgName), reply->size());
1419 *aReply = std::move(reply);
1421 // NOTE: Only collect IPC_SYNC_MAIN_LATENCY_MS on the main thread (bug
1422 // 1343729)
1423 if (NS_IsMainThread() && latencyMs >= kMinTelemetrySyncIPCLatencyMs) {
1424 Telemetry::Accumulate(Telemetry::IPC_SYNC_MAIN_LATENCY_MS,
1425 nsDependentCString(msgName), latencyMs);
1427 return true;
1430 bool MessageChannel::HasPendingEvents() {
1431 AssertWorkerThread();
1432 mMonitor->AssertCurrentThreadOwns();
1433 return ConnectedOrClosing() && !mPending.isEmpty();
1436 bool MessageChannel::ProcessPendingRequest(ActorLifecycleProxy* aProxy,
1437 UniquePtr<Message> aUrgent) {
1438 AssertWorkerThread();
1439 mMonitor->AssertCurrentThreadOwns();
1441 IPC_LOG("Process pending: seqno=%d, xid=%d", aUrgent->seqno(),
1442 aUrgent->transaction_id());
1444 // keep the error relevant information
1445 msgid_t msgType = aUrgent->type();
1447 DispatchMessage(aProxy, std::move(aUrgent));
1448 if (!ConnectedOrClosing()) {
1449 ReportConnectionError("ProcessPendingRequest", msgType);
1450 return false;
1453 return true;
1456 bool MessageChannel::ShouldRunMessage(const Message& aMsg) {
1457 if (!mTimedOutMessageSeqno) {
1458 return true;
1461 // If we've timed out a message and we're awaiting the reply to the timed
1462 // out message, we have to be careful what messages we process. Here's what
1463 // can go wrong:
1464 // 1. child sends a NOT_NESTED sync message S
1465 // 2. parent sends a NESTED_INSIDE_SYNC sync message H at the same time
1466 // 3. parent times out H
1467 // 4. child starts processing H and sends a NESTED_INSIDE_SYNC message H'
1468 // nested within the same transaction
1469 // 5. parent dispatches S and sends reply
1470 // 6. child asserts because it instead expected a reply to H'.
1472 // To solve this, we refuse to process S in the parent until we get a reply
1473 // to H. More generally, let the timed out message be M. We don't process a
1474 // message unless the child would need the response to that message in order
1475 // to process M. Those messages are the ones that have a higher nested level
1476 // than M or that are part of the same transaction as M.
1477 if (aMsg.nested_level() < mTimedOutMessageNestedLevel ||
1478 (aMsg.nested_level() == mTimedOutMessageNestedLevel &&
1479 aMsg.transaction_id() != mTimedOutMessageSeqno)) {
1480 return false;
1483 return true;
1486 void MessageChannel::RunMessage(ActorLifecycleProxy* aProxy,
1487 MessageTask& aTask) {
1488 AssertWorkerThread();
1489 mMonitor->AssertCurrentThreadOwns();
1490 aTask.AssertMonitorHeld(*mMonitor);
1492 UniquePtr<Message>& msg = aTask.Msg();
1494 if (!ConnectedOrClosing()) {
1495 ReportConnectionError("RunMessage", msg->type());
1496 return;
1499 // Check that we're going to run the first message that's valid to run.
1500 #if 0
1501 # ifdef DEBUG
1502 for (MessageTask* task : mPending) {
1503 if (task == &aTask) {
1504 break;
1507 MOZ_ASSERT(!ShouldRunMessage(*task->Msg()) ||
1508 aTask.Msg()->priority() != task->Msg()->priority());
1511 # endif
1512 #endif
1514 if (!ShouldRunMessage(*msg)) {
1515 return;
1518 MOZ_RELEASE_ASSERT(aTask.isInList());
1519 aTask.remove();
1521 if (!IsAlwaysDeferred(*msg)) {
1522 mMaybeDeferredPendingCount--;
1525 DispatchMessage(aProxy, std::move(msg));
1528 NS_IMPL_ISUPPORTS_INHERITED(MessageChannel::MessageTask, CancelableRunnable,
1529 nsIRunnablePriority, nsIRunnableIPCMessageType)
1531 static uint32_t ToRunnablePriority(IPC::Message::PriorityValue aPriority) {
1532 switch (aPriority) {
1533 case IPC::Message::NORMAL_PRIORITY:
1534 return nsIRunnablePriority::PRIORITY_NORMAL;
1535 case IPC::Message::INPUT_PRIORITY:
1536 return nsIRunnablePriority::PRIORITY_INPUT_HIGH;
1537 case IPC::Message::VSYNC_PRIORITY:
1538 return nsIRunnablePriority::PRIORITY_VSYNC;
1539 case IPC::Message::MEDIUMHIGH_PRIORITY:
1540 return nsIRunnablePriority::PRIORITY_MEDIUMHIGH;
1541 case IPC::Message::CONTROL_PRIORITY:
1542 return nsIRunnablePriority::PRIORITY_CONTROL;
1543 default:
1544 MOZ_ASSERT_UNREACHABLE();
1545 return nsIRunnablePriority::PRIORITY_NORMAL;
1549 MessageChannel::MessageTask::MessageTask(MessageChannel* aChannel,
1550 UniquePtr<Message> aMessage)
1551 : CancelableRunnable(aMessage->name()),
1552 mMonitor(aChannel->mMonitor),
1553 mChannel(aChannel),
1554 mMessage(std::move(aMessage)),
1555 mPriority(ToRunnablePriority(mMessage->priority())),
1556 mScheduled(false)
1557 #ifdef FUZZING_SNAPSHOT
1559 mIsFuzzMsg(mMessage->IsFuzzMsg()),
1560 mFuzzStopped(false)
1561 #endif
1563 MOZ_DIAGNOSTIC_ASSERT(mMessage, "message may not be null");
1564 #ifdef FUZZING_SNAPSHOT
1565 if (mIsFuzzMsg) {
1566 MOZ_FUZZING_IPC_MT_CTOR();
1568 #endif
1571 MessageChannel::MessageTask::~MessageTask() {
1572 #ifdef FUZZING_SNAPSHOT
1573 // We track fuzzing messages until their run is complete. To make sure
1574 // that we don't miss messages that are for some reason destroyed without
1575 // being run (e.g. canceled), we catch this condition in the destructor.
1576 if (mIsFuzzMsg && !mFuzzStopped) {
1577 MOZ_FUZZING_IPC_MT_STOP();
1578 } else if (!mIsFuzzMsg && !fuzzing::Nyx::instance().started()) {
1579 MOZ_FUZZING_IPC_PRE_FUZZ_MT_STOP();
1581 #endif
1584 nsresult MessageChannel::MessageTask::Run() {
1585 mMonitor->AssertNotCurrentThreadOwns();
1587 // Drop the toplevel actor's lifecycle proxy outside of our monitor if we take
1588 // it, as destroying our ActorLifecycleProxy reference can acquire the
1589 // monitor.
1590 RefPtr<ActorLifecycleProxy> proxy;
1592 MonitorAutoLock lock(*mMonitor);
1594 // In case we choose not to run this message, we may need to be able to Post
1595 // it again.
1596 mScheduled = false;
1598 if (!isInList()) {
1599 return NS_OK;
1602 #ifdef FUZZING_SNAPSHOT
1603 if (!mIsFuzzMsg) {
1604 if (fuzzing::Nyx::instance().started()) {
1605 // Once we started fuzzing, prevent non-fuzzing tasks from being
1606 // run and potentially blocking worker threads.
1608 // TODO: This currently blocks all MessageTasks from running, not
1609 // just those belonging to the target process pair. We currently
1610 // do this for performance reasons, but it should be re-evaluated
1611 // at a later stage when we found a better snapshot point.
1612 return NS_OK;
1614 // Record all running tasks prior to fuzzing, so we can wait for
1615 // them to settle before snapshotting.
1616 MOZ_FUZZING_IPC_PRE_FUZZ_MT_RUN();
1618 #endif
1620 Channel()->AssertWorkerThread();
1621 mMonitor->AssertSameMonitor(*Channel()->mMonitor);
1622 proxy = Channel()->Listener()->GetLifecycleProxy();
1623 Channel()->RunMessage(proxy, *this);
1625 #ifdef FUZZING_SNAPSHOT
1626 if (mIsFuzzMsg && !mFuzzStopped) {
1627 MOZ_FUZZING_IPC_MT_STOP();
1628 mFuzzStopped = true;
1630 #endif
1631 return NS_OK;
1634 // Warning: This method removes the receiver from whatever list it might be in.
1635 nsresult MessageChannel::MessageTask::Cancel() {
1636 mMonitor->AssertNotCurrentThreadOwns();
1638 MonitorAutoLock lock(*mMonitor);
1640 if (!isInList()) {
1641 return NS_OK;
1644 Channel()->AssertWorkerThread();
1645 mMonitor->AssertSameMonitor(*Channel()->mMonitor);
1646 if (!IsAlwaysDeferred(*Msg())) {
1647 Channel()->mMaybeDeferredPendingCount--;
1650 remove();
1652 #ifdef FUZZING_SNAPSHOT
1653 if (mIsFuzzMsg && !mFuzzStopped) {
1654 MOZ_FUZZING_IPC_MT_STOP();
1655 mFuzzStopped = true;
1657 #endif
1659 return NS_OK;
1662 void MessageChannel::MessageTask::Post() {
1663 mMonitor->AssertCurrentThreadOwns();
1664 mMonitor->AssertSameMonitor(*Channel()->mMonitor);
1665 MOZ_RELEASE_ASSERT(!mScheduled);
1666 MOZ_RELEASE_ASSERT(isInList());
1668 mScheduled = true;
1670 Channel()->mWorkerThread->Dispatch(do_AddRef(this));
1673 NS_IMETHODIMP
1674 MessageChannel::MessageTask::GetPriority(uint32_t* aPriority) {
1675 *aPriority = mPriority;
1676 return NS_OK;
1679 NS_IMETHODIMP
1680 MessageChannel::MessageTask::GetType(uint32_t* aType) {
1681 mMonitor->AssertNotCurrentThreadOwns();
1683 MonitorAutoLock lock(*mMonitor);
1684 if (!mMessage) {
1685 // If mMessage has been moved already elsewhere, we can't know what the type
1686 // has been.
1687 return NS_ERROR_FAILURE;
1690 *aType = mMessage->type();
1691 return NS_OK;
1694 void MessageChannel::DispatchMessage(ActorLifecycleProxy* aProxy,
1695 UniquePtr<Message> aMsg) {
1696 AssertWorkerThread();
1697 mMonitor->AssertCurrentThreadOwns();
1699 Maybe<AutoNoJSAPI> nojsapi;
1700 if (NS_IsMainThread() && CycleCollectedJSContext::Get()) {
1701 nojsapi.emplace();
1704 UniquePtr<Message> reply;
1706 #ifdef FUZZING_SNAPSHOT
1707 if (IsCrossProcess()) {
1708 aMsg = mozilla::fuzzing::IPCFuzzController::instance().replaceIPCMessage(
1709 std::move(aMsg));
1711 #endif
1713 IPC_LOG("DispatchMessage: seqno=%d, xid=%d", aMsg->seqno(),
1714 aMsg->transaction_id());
1715 AddProfilerMarker(*aMsg, MessageDirection::eReceiving);
1718 AutoEnterTransaction transaction(this, *aMsg);
1720 int id = aMsg->transaction_id();
1721 MOZ_RELEASE_ASSERT(!aMsg->is_sync() || id == transaction.TransactionID());
1724 MonitorAutoUnlock unlock(*mMonitor);
1725 AutoSetValue<bool> setOnCxxStack(mOnCxxStack, true);
1727 mListener->ArtificialSleep();
1729 if (aMsg->is_sync()) {
1730 DispatchSyncMessage(aProxy, *aMsg, reply);
1731 } else {
1732 DispatchAsyncMessage(aProxy, *aMsg);
1735 mListener->ArtificialSleep();
1738 if (reply && transaction.IsCanceled()) {
1739 // The transaction has been canceled. Don't send a reply.
1740 IPC_LOG("Nulling out reply due to cancellation, seqno=%d, xid=%d",
1741 aMsg->seqno(), id);
1742 reply = nullptr;
1746 #ifdef FUZZING_SNAPSHOT
1747 if (aMsg->IsFuzzMsg()) {
1748 mozilla::fuzzing::IPCFuzzController::instance().syncAfterReplace();
1750 #endif
1752 if (reply && ChannelConnected == mChannelState) {
1753 IPC_LOG("Sending reply seqno=%d, xid=%d", aMsg->seqno(),
1754 aMsg->transaction_id());
1755 AddProfilerMarker(*reply, MessageDirection::eSending);
1757 SendMessageToLink(std::move(reply));
1761 void MessageChannel::DispatchSyncMessage(ActorLifecycleProxy* aProxy,
1762 const Message& aMsg,
1763 UniquePtr<Message>& aReply) {
1764 AssertWorkerThread();
1766 mozilla::TimeStamp start = TimeStamp::Now();
1768 int nestedLevel = aMsg.nested_level();
1770 MOZ_RELEASE_ASSERT(nestedLevel == IPC::Message::NOT_NESTED ||
1771 NS_IsMainThread());
1773 MessageChannel* dummy;
1774 MessageChannel*& blockingVar =
1775 mSide == ChildSide && NS_IsMainThread() ? gParentProcessBlocker : dummy;
1777 Result rv;
1779 AutoSetValue<MessageChannel*> blocked(blockingVar, this);
1780 rv = aProxy->Get()->OnMessageReceived(aMsg, aReply);
1783 uint32_t latencyMs = round((TimeStamp::Now() - start).ToMilliseconds());
1784 if (latencyMs >= kMinTelemetrySyncIPCLatencyMs) {
1785 Telemetry::Accumulate(Telemetry::IPC_SYNC_RECEIVE_MS,
1786 nsDependentCString(aMsg.name()), latencyMs);
1789 if (!MaybeHandleError(rv, aMsg, "DispatchSyncMessage")) {
1790 aReply = Message::ForSyncDispatchError(aMsg.nested_level());
1792 aReply->set_seqno(aMsg.seqno());
1793 aReply->set_transaction_id(aMsg.transaction_id());
1796 void MessageChannel::DispatchAsyncMessage(ActorLifecycleProxy* aProxy,
1797 const Message& aMsg) {
1798 AssertWorkerThread();
1799 MOZ_RELEASE_ASSERT(!aMsg.is_sync());
1801 if (aMsg.routing_id() == MSG_ROUTING_NONE) {
1802 NS_WARNING("unhandled special message!");
1803 MaybeHandleError(MsgNotKnown, aMsg, "DispatchAsyncMessage");
1804 return;
1807 Result rv;
1809 int nestedLevel = aMsg.nested_level();
1810 AutoSetValue<bool> async(mDispatchingAsyncMessage, true);
1811 AutoSetValue<int> nestedLevelSet(mDispatchingAsyncMessageNestedLevel,
1812 nestedLevel);
1813 rv = aProxy->Get()->OnMessageReceived(aMsg);
1815 MaybeHandleError(rv, aMsg, "DispatchAsyncMessage");
1818 void MessageChannel::EnqueuePendingMessages() {
1819 AssertWorkerThread();
1820 mMonitor->AssertCurrentThreadOwns();
1822 // XXX performance tuning knob: could process all or k pending
1823 // messages here, rather than enqueuing for later processing
1825 RepostAllMessages();
1828 bool MessageChannel::WaitResponse(bool aWaitTimedOut) {
1829 AssertWorkerThread();
1830 if (aWaitTimedOut) {
1831 if (mInTimeoutSecondHalf) {
1832 // We've really timed out this time.
1833 return false;
1835 // Try a second time.
1836 mInTimeoutSecondHalf = true;
1837 } else {
1838 mInTimeoutSecondHalf = false;
1840 return true;
1843 #ifndef XP_WIN
1844 bool MessageChannel::WaitForSyncNotify() {
1845 AssertWorkerThread();
1846 # ifdef DEBUG
1847 // WARNING: We don't release the lock here. We can't because the link
1848 // could signal at this time and we would miss it. Instead we require
1849 // ArtificialTimeout() to be extremely simple.
1850 if (mListener->ArtificialTimeout()) {
1851 return false;
1853 # endif
1855 MOZ_RELEASE_ASSERT(!mIsSameThreadChannel,
1856 "Wait on same-thread channel will deadlock!");
1858 TimeDuration timeout = (kNoTimeout == mTimeoutMs)
1859 ? TimeDuration::Forever()
1860 : TimeDuration::FromMilliseconds(mTimeoutMs);
1861 CVStatus status = mMonitor->Wait(timeout);
1863 // If the timeout didn't expire, we know we received an event. The
1864 // converse is not true.
1865 return WaitResponse(status == CVStatus::Timeout);
1868 void MessageChannel::NotifyWorkerThread() { mMonitor->Notify(); }
1869 #endif
1871 bool MessageChannel::ShouldContinueFromTimeout() {
1872 AssertWorkerThread();
1873 mMonitor->AssertCurrentThreadOwns();
1875 bool cont;
1877 MonitorAutoUnlock unlock(*mMonitor);
1878 cont = mListener->ShouldContinueFromReplyTimeout();
1879 mListener->ArtificialSleep();
1882 static enum {
1883 UNKNOWN,
1884 NOT_DEBUGGING,
1885 DEBUGGING
1886 } sDebuggingChildren = UNKNOWN;
1888 if (sDebuggingChildren == UNKNOWN) {
1889 sDebuggingChildren =
1890 getenv("MOZ_DEBUG_CHILD_PROCESS") || getenv("MOZ_DEBUG_CHILD_PAUSE")
1891 ? DEBUGGING
1892 : NOT_DEBUGGING;
1894 if (sDebuggingChildren == DEBUGGING) {
1895 return true;
1898 return cont;
1901 void MessageChannel::SetReplyTimeoutMs(int32_t aTimeoutMs) {
1902 // Set channel timeout value. Since this is broken up into
1903 // two period, the minimum timeout value is 2ms.
1904 AssertWorkerThread();
1905 mTimeoutMs =
1906 (aTimeoutMs <= 0) ? kNoTimeout : (int32_t)ceil((double)aTimeoutMs / 2.0);
1909 void MessageChannel::ReportConnectionError(const char* aFunctionName,
1910 const uint32_t aMsgType) const {
1911 AssertWorkerThread();
1912 mMonitor->AssertCurrentThreadOwns();
1914 const char* errorMsg = nullptr;
1915 switch (mChannelState) {
1916 case ChannelClosed:
1917 errorMsg = "Closed channel: cannot send/recv";
1918 break;
1919 case ChannelClosing:
1920 errorMsg = "Channel closing: too late to send, messages will be lost";
1921 break;
1922 case ChannelError:
1923 errorMsg = "Channel error: cannot send/recv";
1924 break;
1926 default:
1927 MOZ_CRASH("unreached");
1930 // IPC connection errors are fairly common, especially "Channel closing: too
1931 // late to send/recv, messages will be lost", so shouldn't be being reported
1932 // on release builds, as that's misleading as to their severity.
1933 NS_WARNING(nsPrintfCString("IPC Connection Error: [%s][%s] %s(msgname=%s) %s",
1934 StringFromIPCSide(mSide), mName, aFunctionName,
1935 IPC::StringFromIPCMessageType(aMsgType), errorMsg)
1936 .get());
1938 MonitorAutoUnlock unlock(*mMonitor);
1939 mListener->ProcessingError(MsgDropped, errorMsg);
1942 void MessageChannel::ReportMessageRouteError(const char* channelName) const {
1943 PrintErrorMessage(mSide, channelName, "Need a route");
1944 mListener->ProcessingError(MsgRouteError, "MsgRouteError");
1947 bool MessageChannel::MaybeHandleError(Result code, const Message& aMsg,
1948 const char* channelName) {
1949 if (MsgProcessed == code) return true;
1951 #ifdef FUZZING_SNAPSHOT
1952 mozilla::fuzzing::IPCFuzzController::instance().OnMessageError(code, aMsg);
1953 #endif
1955 const char* errorMsg = nullptr;
1956 switch (code) {
1957 case MsgNotKnown:
1958 errorMsg = "Unknown message: not processed";
1959 break;
1960 case MsgNotAllowed:
1961 errorMsg = "Message not allowed: cannot be sent/recvd in this state";
1962 break;
1963 case MsgPayloadError:
1964 errorMsg = "Payload error: message could not be deserialized";
1965 break;
1966 case MsgProcessingError:
1967 errorMsg =
1968 "Processing error: message was deserialized, but the handler "
1969 "returned false (indicating failure)";
1970 break;
1971 case MsgRouteError:
1972 errorMsg = "Route error: message sent to unknown actor ID";
1973 break;
1974 case MsgValueError:
1975 errorMsg =
1976 "Value error: message was deserialized, but contained an illegal "
1977 "value";
1978 break;
1980 default:
1981 MOZ_CRASH("unknown Result code");
1982 return false;
1985 char reason[512];
1986 const char* msgname = aMsg.name();
1987 if (msgname[0] == '?') {
1988 SprintfLiteral(reason, "(msgtype=0x%X) %s", aMsg.type(), errorMsg);
1989 } else {
1990 SprintfLiteral(reason, "%s %s", msgname, errorMsg);
1993 PrintErrorMessage(mSide, channelName, reason);
1995 // Error handled in mozilla::ipc::IPCResult.
1996 if (code == MsgProcessingError) {
1997 return false;
2000 mListener->ProcessingError(code, reason);
2002 return false;
2005 void MessageChannel::OnChannelErrorFromLink() {
2006 mMonitor->AssertCurrentThreadOwns();
2007 MOZ_ASSERT(mChannelState == ChannelConnected);
2009 IPC_LOG("OnChannelErrorFromLink");
2011 if (AwaitingSyncReply()) {
2012 NotifyWorkerThread();
2015 if (mAbortOnError) {
2016 // mAbortOnError is set by main actors (e.g., ContentChild) to ensure
2017 // that the process terminates even if normal shutdown is prevented.
2018 // A MOZ_CRASH() here is not helpful because crash reporting relies
2019 // on the parent process which we know is dead or otherwise unusable.
2021 // Additionally, the parent process can (and often is) killed on Android
2022 // when apps are backgrounded. We don't need to report a crash for
2023 // normal behavior in that case.
2024 printf_stderr("Exiting due to channel error.\n");
2025 ProcessChild::QuickExit();
2027 mChannelState = ChannelError;
2028 mMonitor->Notify();
2030 PostErrorNotifyTask();
2033 void MessageChannel::NotifyMaybeChannelError(ReleasableMonitorAutoLock& aLock) {
2034 AssertWorkerThread();
2035 mMonitor->AssertCurrentThreadOwns();
2036 aLock.AssertCurrentThreadOwns();
2037 MOZ_ASSERT(mChannelState != ChannelConnected);
2039 if (ChannelClosing == mChannelState || ChannelClosed == mChannelState) {
2040 // the channel closed, but we received a "Goodbye" message warning us
2041 // about it. no worries
2042 mChannelState = ChannelClosed;
2043 NotifyChannelClosed(aLock);
2044 return;
2047 MOZ_ASSERT(ChannelError == mChannelState);
2049 Clear();
2051 // IPDL assumes these notifications do not fire twice, so we do not let
2052 // that happen.
2053 if (mNotifiedChannelDone) {
2054 return;
2056 mNotifiedChannelDone = true;
2058 // Let our listener know that the channel errored. This may cause the
2059 // channel to be deleted. Release our caller's `MonitorAutoLock` before
2060 // invoking the listener, as this may call back into MessageChannel, and/or
2061 // cause the channel to be destroyed.
2062 aLock.Unlock();
2063 mListener->OnChannelError();
2066 void MessageChannel::OnNotifyMaybeChannelError() {
2067 AssertWorkerThread();
2068 mMonitor->AssertNotCurrentThreadOwns();
2070 // This lock guard may be reset by `NotifyMaybeChannelError` before invoking
2071 // listener callbacks which may destroy this `MessageChannel`.
2073 // Acquiring the lock here also allows us to ensure that
2074 // `OnChannelErrorFromLink` has finished running before this task is allowed
2075 // to continue.
2076 ReleasableMonitorAutoLock lock(*mMonitor);
2078 mChannelErrorTask = nullptr;
2080 if (IsOnCxxStack()) {
2081 // This used to post a 10ms delayed task; however not all
2082 // nsISerialEventTarget implementations support delayed dispatch.
2083 // The delay being completely arbitrary, we may not as well have any.
2084 PostErrorNotifyTask();
2085 return;
2088 // This call may destroy `this`.
2089 NotifyMaybeChannelError(lock);
2092 void MessageChannel::PostErrorNotifyTask() {
2093 mMonitor->AssertCurrentThreadOwns();
2095 if (mChannelErrorTask) {
2096 return;
2099 // This must be the last code that runs on this thread!
2100 mChannelErrorTask = NewNonOwningCancelableRunnableMethod(
2101 "ipc::MessageChannel::OnNotifyMaybeChannelError", this,
2102 &MessageChannel::OnNotifyMaybeChannelError);
2103 mWorkerThread->Dispatch(do_AddRef(mChannelErrorTask));
2106 // Special async message.
2107 class GoodbyeMessage : public IPC::Message {
2108 public:
2109 GoodbyeMessage() : IPC::Message(MSG_ROUTING_NONE, GOODBYE_MESSAGE_TYPE) {}
2110 static bool Read(const Message* msg) { return true; }
2111 void Log(const std::string& aPrefix, FILE* aOutf) const {
2112 fputs("(special `Goodbye' message)", aOutf);
2116 void MessageChannel::InduceConnectionError() {
2117 MonitorAutoLock lock(*mMonitor);
2119 // Either connected or closing, immediately convert to an error and notify.
2120 switch (mChannelState) {
2121 case ChannelConnected:
2122 // The channel is still actively connected. Immediately shut down the
2123 // connection with our peer and simulate it invoking
2124 // OnChannelErrorFromLink on us.
2126 // This will update the state to ChannelError, preventing new messages
2127 // from being processed, leading to an error being reported asynchronously
2128 // to our listener.
2129 mLink->Close();
2130 OnChannelErrorFromLink();
2131 return;
2133 case ChannelClosing:
2134 // An notify task has already been posted. Update mChannelState to stop
2135 // processing new messages and treat the notification as an error.
2136 mChannelState = ChannelError;
2137 return;
2139 default:
2140 // Either already closed or errored. Nothing to do.
2141 MOZ_ASSERT(mChannelState == ChannelClosed ||
2142 mChannelState == ChannelError);
2143 return;
2147 void MessageChannel::NotifyImpendingShutdown() {
2148 UniquePtr<Message> msg =
2149 MakeUnique<Message>(MSG_ROUTING_NONE, IMPENDING_SHUTDOWN_MESSAGE_TYPE);
2150 MonitorAutoLock lock(*mMonitor);
2151 if (Connected()) {
2152 SendMessageToLink(std::move(msg));
2156 void MessageChannel::Close() {
2157 AssertWorkerThread();
2158 mMonitor->AssertNotCurrentThreadOwns();
2160 // This lock guard may be reset by `Notify{ChannelClosed,MaybeChannelError}`
2161 // before invoking listener callbacks which may destroy this `MessageChannel`.
2162 ReleasableMonitorAutoLock lock(*mMonitor);
2164 switch (mChannelState) {
2165 case ChannelError:
2166 // See bug 538586: if the listener gets deleted while the
2167 // IO thread's NotifyChannelError event is still enqueued
2168 // and subsequently deletes us, then the error event will
2169 // also be deleted and the listener will never be notified
2170 // of the channel error.
2171 NotifyMaybeChannelError(lock);
2172 return;
2173 case ChannelClosed:
2174 // Slightly unexpected but harmless; ignore. See bug 1554244.
2175 return;
2177 default:
2178 // Notify the other side that we're about to close our socket. If we've
2179 // already received a Goodbye from the other side (and our state is
2180 // ChannelClosing), there's no reason to send one.
2181 if (ChannelConnected == mChannelState) {
2182 SendMessageToLink(MakeUnique<GoodbyeMessage>());
2184 mLink->Close();
2185 mChannelState = ChannelClosed;
2186 NotifyChannelClosed(lock);
2187 return;
2191 void MessageChannel::NotifyChannelClosed(ReleasableMonitorAutoLock& aLock) {
2192 AssertWorkerThread();
2193 mMonitor->AssertCurrentThreadOwns();
2194 aLock.AssertCurrentThreadOwns();
2196 if (ChannelClosed != mChannelState) {
2197 MOZ_CRASH("channel should have been closed!");
2200 Clear();
2202 // IPDL assumes these notifications do not fire twice, so we do not let
2203 // that happen.
2204 if (mNotifiedChannelDone) {
2205 return;
2207 mNotifiedChannelDone = true;
2209 // Let our listener know that the channel was closed. This may cause the
2210 // channel to be deleted. Release our caller's `MonitorAutoLock` before
2211 // invoking the listener, as this may call back into MessageChannel, and/or
2212 // cause the channel to be destroyed.
2213 aLock.Unlock();
2214 mListener->OnChannelClose();
2217 void MessageChannel::DebugAbort(const char* file, int line, const char* cond,
2218 const char* why, bool reply) {
2219 AssertWorkerThread();
2220 mMonitor->AssertCurrentThreadOwns();
2222 printf_stderr(
2223 "###!!! [MessageChannel][%s][%s:%d] "
2224 "Assertion (%s) failed. %s %s\n",
2225 StringFromIPCSide(mSide), file, line, cond, why, reply ? "(reply)" : "");
2227 MessageQueue pending = std::move(mPending);
2228 while (!pending.isEmpty()) {
2229 pending.getFirst()->AssertMonitorHeld(*mMonitor);
2230 printf_stderr(" [ %s%s ]\n",
2231 pending.getFirst()->Msg()->is_sync() ? "sync" : "async",
2232 pending.getFirst()->Msg()->is_reply() ? "reply" : "");
2233 pending.popFirst();
2236 MOZ_CRASH_UNSAFE(why);
2239 void MessageChannel::AddProfilerMarker(const IPC::Message& aMessage,
2240 MessageDirection aDirection) {
2241 mMonitor->AssertCurrentThreadOwns();
2243 if (profiler_feature_active(ProfilerFeature::IPCMessages)) {
2244 base::ProcessId pid = mListener->OtherPidMaybeInvalid();
2245 // Only record markers for IPCs with a valid pid.
2246 // And if one of the profiler mutexes is locked on this thread, don't record
2247 // markers, because we don't want to expose profiler IPCs due to the
2248 // profiler itself, and also to avoid possible re-entrancy issues.
2249 if (pid != base::kInvalidProcessId &&
2250 !profiler_is_locked_on_current_thread()) {
2251 // The current timestamp must be given to the `IPCMarker` payload.
2252 [[maybe_unused]] const TimeStamp now = TimeStamp::Now();
2253 bool isThreadBeingProfiled =
2254 profiler_thread_is_being_profiled_for_markers();
2255 PROFILER_MARKER(
2256 "IPC", IPC,
2257 mozilla::MarkerOptions(
2258 mozilla::MarkerTiming::InstantAt(now),
2259 // If the thread is being profiled, add the marker to
2260 // the current thread. If the thread is not being
2261 // profiled, add the marker to the main thread. It
2262 // will appear in the main thread's IPC track. Profiler analysis
2263 // UI correlates all the IPC markers from different threads and
2264 // generates processed markers.
2265 isThreadBeingProfiled ? mozilla::MarkerThreadId::CurrentThread()
2266 : mozilla::MarkerThreadId::MainThread()),
2267 IPCMarker, now, now, pid, aMessage.seqno(), aMessage.type(), mSide,
2268 aDirection, MessagePhase::Endpoint, aMessage.is_sync(),
2269 // aOriginThreadId: If the thread is being profiled, do not include a
2270 // thread ID, as it's the same as the markers. Only include this field
2271 // when the marker is being sent from another thread.
2272 isThreadBeingProfiled ? mozilla::MarkerThreadId{}
2273 : mozilla::MarkerThreadId::CurrentThread());
2278 void MessageChannel::EndTimeout() {
2279 mMonitor->AssertCurrentThreadOwns();
2281 IPC_LOG("Ending timeout of seqno=%d", mTimedOutMessageSeqno);
2282 mTimedOutMessageSeqno = 0;
2283 mTimedOutMessageNestedLevel = 0;
2285 RepostAllMessages();
2288 void MessageChannel::RepostAllMessages() {
2289 mMonitor->AssertCurrentThreadOwns();
2291 bool needRepost = false;
2292 for (MessageTask* task : mPending) {
2293 task->AssertMonitorHeld(*mMonitor);
2294 if (!task->IsScheduled()) {
2295 needRepost = true;
2296 break;
2299 if (!needRepost) {
2300 // If everything is already scheduled to run, do nothing.
2301 return;
2304 // In some cases we may have deferred dispatch of some messages in the
2305 // queue. Now we want to run them again. However, we can't just re-post
2306 // those messages since the messages after them in mPending would then be
2307 // before them in the event queue. So instead we cancel everything and
2308 // re-post all messages in the correct order.
2309 MessageQueue queue = std::move(mPending);
2310 while (RefPtr<MessageTask> task = queue.popFirst()) {
2311 task->AssertMonitorHeld(*mMonitor);
2312 RefPtr<MessageTask> newTask = new MessageTask(this, std::move(task->Msg()));
2313 newTask->AssertMonitorHeld(*mMonitor);
2314 mPending.insertBack(newTask);
2315 newTask->Post();
2318 AssertMaybeDeferredCountCorrect();
2321 void MessageChannel::CancelTransaction(int transaction) {
2322 mMonitor->AssertCurrentThreadOwns();
2324 // When we cancel a transaction, we need to behave as if there's no longer
2325 // any IPC on the stack. Anything we were dispatching or sending will get
2326 // canceled. Consequently, we have to update the state variables below.
2328 // We also need to ensure that when any IPC functions on the stack return,
2329 // they don't reset these values using an RAII class like AutoSetValue. To
2330 // avoid that, these RAII classes check if the variable they set has been
2331 // tampered with (by us). If so, they don't reset the variable to the old
2332 // value.
2334 IPC_LOG("CancelTransaction: xid=%d", transaction);
2336 // An unusual case: We timed out a transaction which the other side then
2337 // cancelled. In this case we just leave the timedout state and try to
2338 // forget this ever happened.
2339 if (transaction == mTimedOutMessageSeqno) {
2340 IPC_LOG("Cancelled timed out message %d", mTimedOutMessageSeqno);
2341 EndTimeout();
2343 // Normally mCurrentTransaction == 0 here. But it can be non-zero if:
2344 // 1. Parent sends NESTED_INSIDE_SYNC message H.
2345 // 2. Parent times out H.
2346 // 3. Child dispatches H and sends nested message H' (same transaction).
2347 // 4. Parent dispatches H' and cancels.
2348 MOZ_RELEASE_ASSERT(!mTransactionStack ||
2349 mTransactionStack->TransactionID() == transaction);
2350 if (mTransactionStack) {
2351 mTransactionStack->Cancel();
2353 } else {
2354 MOZ_RELEASE_ASSERT(mTransactionStack->TransactionID() == transaction);
2355 mTransactionStack->Cancel();
2358 bool foundSync = false;
2359 for (MessageTask* p = mPending.getFirst(); p;) {
2360 p->AssertMonitorHeld(*mMonitor);
2361 UniquePtr<Message>& msg = p->Msg();
2363 // If there was a race between the parent and the child, then we may
2364 // have a queued sync message. We want to drop this message from the
2365 // queue since if will get cancelled along with the transaction being
2366 // cancelled. This happens if the message in the queue is
2367 // NESTED_INSIDE_SYNC.
2368 if (msg->is_sync() && msg->nested_level() != IPC::Message::NOT_NESTED) {
2369 MOZ_RELEASE_ASSERT(!foundSync);
2370 MOZ_RELEASE_ASSERT(msg->transaction_id() != transaction);
2371 IPC_LOG("Removing msg from queue seqno=%d xid=%d", msg->seqno(),
2372 msg->transaction_id());
2373 foundSync = true;
2374 if (!IsAlwaysDeferred(*msg)) {
2375 mMaybeDeferredPendingCount--;
2377 p = p->removeAndGetNext();
2378 continue;
2381 p = p->getNext();
2384 AssertMaybeDeferredCountCorrect();
2387 void MessageChannel::CancelCurrentTransaction() {
2388 MonitorAutoLock lock(*mMonitor);
2389 if (DispatchingSyncMessageNestedLevel() >= IPC::Message::NESTED_INSIDE_SYNC) {
2390 if (DispatchingSyncMessageNestedLevel() ==
2391 IPC::Message::NESTED_INSIDE_CPOW ||
2392 DispatchingAsyncMessageNestedLevel() ==
2393 IPC::Message::NESTED_INSIDE_CPOW) {
2394 mListener->IntentionalCrash();
2397 IPC_LOG("Cancel requested: current xid=%d",
2398 CurrentNestedInsideSyncTransaction());
2399 MOZ_RELEASE_ASSERT(DispatchingSyncMessage());
2400 auto cancel =
2401 MakeUnique<CancelMessage>(CurrentNestedInsideSyncTransaction());
2402 CancelTransaction(CurrentNestedInsideSyncTransaction());
2403 SendMessageToLink(std::move(cancel));
2407 void CancelCPOWs() {
2408 MOZ_ASSERT(NS_IsMainThread());
2410 if (gParentProcessBlocker) {
2411 mozilla::Telemetry::Accumulate(mozilla::Telemetry::IPC_TRANSACTION_CANCEL,
2412 true);
2413 gParentProcessBlocker->CancelCurrentTransaction();
2417 bool MessageChannel::IsCrossProcess() const {
2418 mMonitor->AssertCurrentThreadOwns();
2419 return mIsCrossProcess;
2422 void MessageChannel::SetIsCrossProcess(bool aIsCrossProcess) {
2423 mMonitor->AssertCurrentThreadOwns();
2424 if (aIsCrossProcess == mIsCrossProcess) {
2425 return;
2427 mIsCrossProcess = aIsCrossProcess;
2428 if (mIsCrossProcess) {
2429 ChannelCountReporter::Increment(mName);
2430 } else {
2431 ChannelCountReporter::Decrement(mName);
2435 NS_IMPL_ISUPPORTS(MessageChannel::WorkerTargetShutdownTask,
2436 nsITargetShutdownTask)
2438 MessageChannel::WorkerTargetShutdownTask::WorkerTargetShutdownTask(
2439 nsISerialEventTarget* aTarget, MessageChannel* aChannel)
2440 : mTarget(aTarget), mChannel(aChannel) {}
2442 void MessageChannel::WorkerTargetShutdownTask::TargetShutdown() {
2443 MOZ_RELEASE_ASSERT(mTarget->IsOnCurrentThread());
2444 IPC_LOG("Closing channel due to event target shutdown");
2445 if (MessageChannel* channel = std::exchange(mChannel, nullptr)) {
2446 channel->Close();
2450 void MessageChannel::WorkerTargetShutdownTask::Clear() {
2451 MOZ_RELEASE_ASSERT(mTarget->IsOnCurrentThread());
2452 mChannel = nullptr;
2455 NS_IMPL_ISUPPORTS_INHERITED0(MessageChannel::FlushLazySendMessagesRunnable,
2456 CancelableRunnable)
2458 MessageChannel::FlushLazySendMessagesRunnable::FlushLazySendMessagesRunnable(
2459 MessageChannel* aChannel)
2460 : CancelableRunnable("MessageChannel::FlushLazyMessagesRunnable"),
2461 mChannel(aChannel) {}
2463 NS_IMETHODIMP MessageChannel::FlushLazySendMessagesRunnable::Run() {
2464 if (mChannel) {
2465 MonitorAutoLock lock(*mChannel->mMonitor);
2466 MOZ_ASSERT(mChannel->mFlushLazySendTask == this);
2467 mChannel->FlushLazySendMessages();
2469 return NS_OK;
2472 nsresult MessageChannel::FlushLazySendMessagesRunnable::Cancel() {
2473 mQueue.Clear();
2474 mChannel = nullptr;
2475 return NS_OK;
2478 void MessageChannel::FlushLazySendMessagesRunnable::PushMessage(
2479 UniquePtr<Message> aMsg) {
2480 MOZ_ASSERT(mChannel);
2481 mQueue.AppendElement(std::move(aMsg));
2484 nsTArray<UniquePtr<IPC::Message>>
2485 MessageChannel::FlushLazySendMessagesRunnable::TakeMessages() {
2486 mChannel = nullptr;
2487 return std::move(mQueue);
2490 } // namespace ipc
2491 } // namespace mozilla