1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
5 * You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/ipc/NodeController.h"
8 #include "MainThreadUtils.h"
9 #include "base/process_util.h"
10 #include "chrome/common/ipc_message.h"
11 #include "mojo/core/ports/name.h"
12 #include "mojo/core/ports/node.h"
13 #include "mojo/core/ports/port_locker.h"
14 #include "mozilla/AlreadyAddRefed.h"
15 #include "mozilla/RandomNum.h"
16 #include "mozilla/StaticPtr.h"
17 #include "mozilla/Assertions.h"
18 #include "mozilla/ToString.h"
19 #include "mozilla/ipc/BrowserProcessSubThread.h"
20 #include "mozilla/ipc/ProtocolUtils.h"
21 #include "mozilla/mozalloc.h"
22 #include "nsISerialEventTarget.h"
24 #include "nsXULAppAPI.h"
25 #include "nsPrintfCString.h"
27 #define PORTS_ALWAYS_OK(expr) MOZ_ALWAYS_TRUE(mojo::core::ports::OK == (expr))
29 namespace mozilla::ipc
{
31 static StaticRefPtr
<NodeController
> gNodeController
;
33 static LazyLogModule gNodeControllerLog
{"NodeController"};
35 // Helper logger macro which includes the name of the `this` NodeController in
36 // the logged messages.
37 #define NODECONTROLLER_LOG(level_, fmt_, ...) \
38 MOZ_LOG(gNodeControllerLog, level_, \
39 ("[%s]: " fmt_, ToString(mName).c_str(), ##__VA_ARGS__))
41 // Helper warning macro which both does logger logging and emits NS_WARNING logs
44 # define NODECONTROLLER_WARNING(fmt_, ...) \
46 nsPrintfCString warning("[%s]: " fmt_, ToString(mName).c_str(), \
48 NS_WARNING(warning.get()); \
49 MOZ_LOG(gNodeControllerLog, LogLevel::Debug, ("%s", warning.get())); \
52 # define NODECONTROLLER_WARNING(fmt_, ...) \
53 NODECONTROLLER_LOG(LogLevel::Warning, fmt_, ##__VA_ARGS__)
56 NodeController::NodeController(const NodeName
& aName
)
57 : mName(aName
), mNode(MakeUnique
<Node
>(aName
, this)) {}
59 NodeController::~NodeController() {
60 auto state
= mState
.Lock();
61 MOZ_RELEASE_ASSERT(state
->mPeers
.IsEmpty(),
62 "Destroying NodeController before closing all peers");
63 MOZ_RELEASE_ASSERT(state
->mInvites
.IsEmpty(),
64 "Destroying NodeController before closing all invites");
67 // FIXME: Actually provide some way to create the thing.
68 /* static */ NodeController
* NodeController::GetSingleton() {
69 MOZ_ASSERT(gNodeController
);
70 return gNodeController
;
73 std::pair
<ScopedPort
, ScopedPort
> NodeController::CreatePortPair() {
75 PORTS_ALWAYS_OK(mNode
->CreatePortPair(&port0
, &port1
));
76 return {ScopedPort
{std::move(port0
), this},
77 ScopedPort
{std::move(port1
), this}};
80 auto NodeController::GetPort(const PortName
& aName
) -> PortRef
{
82 int rv
= mNode
->GetPort(aName
, &port
);
83 if (NS_WARN_IF(rv
!= mojo::core::ports::OK
)) {
84 NODECONTROLLER_WARNING("Call to GetPort(%s) Failed",
85 ToString(aName
).c_str());
91 void NodeController::SetPortObserver(const PortRef
& aPort
,
92 PortObserver
* aObserver
) {
93 PORTS_ALWAYS_OK(mNode
->SetUserData(aPort
, aObserver
));
96 auto NodeController::GetStatus(const PortRef
& aPort
) -> Maybe
<PortStatus
> {
98 int rv
= mNode
->GetStatus(aPort
, &status
);
99 if (rv
!= mojo::core::ports::OK
) {
105 void NodeController::ClosePort(const PortRef
& aPort
) {
106 PORTS_ALWAYS_OK(mNode
->ClosePort(aPort
));
109 bool NodeController::GetMessage(const PortRef
& aPort
,
110 UniquePtr
<IPC::Message
>* aMessage
) {
111 UniquePtr
<UserMessageEvent
> messageEvent
;
112 int rv
= mNode
->GetMessage(aPort
, &messageEvent
, nullptr);
113 if (rv
!= mojo::core::ports::OK
) {
114 if (rv
== mojo::core::ports::ERROR_PORT_PEER_CLOSED
) {
117 MOZ_CRASH("GetMessage on port in invalid state");
121 UniquePtr
<IPC::Message
> message
= messageEvent
->TakeMessage
<IPC::Message
>();
123 // If our UserMessageEvent has any ports directly attached to it, fetch them
124 // from our node and attach them to the IPC::Message we extracted.
126 // It's important to only do this if we have nonempty set of ports on the
127 // event, as we may have never serialized our IPC::Message's ports onto the
128 // event if it was routed in-process.
129 if (messageEvent
->num_ports() > 0) {
130 nsTArray
<ScopedPort
> attachedPorts(messageEvent
->num_ports());
131 for (size_t i
= 0; i
< messageEvent
->num_ports(); ++i
) {
132 attachedPorts
.AppendElement(
133 ScopedPort
{GetPort(messageEvent
->ports()[i
]), this});
135 message
->SetAttachedPorts(std::move(attachedPorts
));
138 *aMessage
= std::move(message
);
145 bool NodeController::SendUserMessage(const PortRef
& aPort
,
146 UniquePtr
<IPC::Message
> aMessage
) {
147 auto messageEvent
= MakeUnique
<UserMessageEvent
>(0);
148 messageEvent
->AttachMessage(std::move(aMessage
));
150 int rv
= mNode
->SendUserMessage(aPort
, std::move(messageEvent
));
151 if (rv
== mojo::core::ports::OK
) {
154 if (rv
== mojo::core::ports::ERROR_PORT_PEER_CLOSED
) {
155 NODECONTROLLER_LOG(LogLevel::Debug
,
156 "Ignoring message to port %s as peer was closed",
157 ToString(aPort
.name()).c_str());
160 NODECONTROLLER_WARNING("Failed to send message to port %s",
161 ToString(aPort
.name()).c_str());
165 auto NodeController::SerializeEventMessage(UniquePtr
<Event
> aEvent
,
166 const NodeName
* aRelayTarget
,
168 -> UniquePtr
<IPC::Message
> {
169 UniquePtr
<IPC::Message
> message
;
170 if (aEvent
->type() == Event::kUserMessage
) {
171 MOZ_DIAGNOSTIC_ASSERT(
172 aType
== EVENT_MESSAGE_TYPE
,
173 "Can only send a UserMessage in an EVENT_MESSAGE_TYPE");
174 message
= static_cast<UserMessageEvent
*>(aEvent
.get())
175 ->TakeMessage
<IPC::Message
>();
177 message
= MakeUnique
<IPC::Message
>(MSG_ROUTING_CONTROL
, aType
);
180 message
->set_relay(aRelayTarget
!= nullptr);
182 size_t length
= aEvent
->GetSerializedSize();
184 length
+= sizeof(NodeName
);
187 // Use an intermediate buffer to serialize to avoid potential issues with the
188 // segmented `IPC::Message` bufferlist. This should be fairly cheap, as the
189 // majority of events are fairly small.
190 Vector
<char, 256, InfallibleAllocPolicy
> buffer
;
191 (void)buffer
.initLengthUninitialized(length
);
193 memcpy(buffer
.begin(), aRelayTarget
, sizeof(NodeName
));
194 aEvent
->Serialize(buffer
.begin() + sizeof(NodeName
));
196 aEvent
->Serialize(buffer
.begin());
199 message
->WriteFooter(buffer
.begin(), buffer
.length());
200 message
->set_event_footer_size(buffer
.length());
203 // Debug-assert that we can read the same data back out of the buffer.
204 MOZ_ASSERT(message
->event_footer_size() == length
);
205 Vector
<char, 256, InfallibleAllocPolicy
> buffer2
;
206 (void)buffer2
.initLengthUninitialized(message
->event_footer_size());
207 MOZ_ASSERT(message
->ReadFooter(buffer2
.begin(), buffer2
.length(),
208 /* truncate */ false));
209 MOZ_ASSERT(!memcmp(buffer2
.begin(), buffer
.begin(), buffer
.length()));
215 auto NodeController::DeserializeEventMessage(UniquePtr
<IPC::Message
> aMessage
,
216 NodeName
* aRelayTarget
)
217 -> UniquePtr
<Event
> {
218 if (aMessage
->is_relay() && !aRelayTarget
) {
219 NODECONTROLLER_WARNING("Unexpected relay message '%s'", aMessage
->name());
223 Vector
<char, 256, InfallibleAllocPolicy
> buffer
;
224 (void)buffer
.initLengthUninitialized(aMessage
->event_footer_size());
225 // Truncate the message when reading the footer, so that the extra footer data
226 // is no longer present in the message. This allows future code to eventually
227 // send the same `IPC::Message` to another process.
228 if (!aMessage
->ReadFooter(buffer
.begin(), buffer
.length(),
229 /* truncate */ true)) {
230 NODECONTROLLER_WARNING("Call to ReadFooter for message '%s' Failed",
234 aMessage
->set_event_footer_size(0);
236 UniquePtr
<Event
> event
;
238 MOZ_ASSERT(aMessage
->is_relay());
239 if (buffer
.length() < sizeof(NodeName
)) {
240 NODECONTROLLER_WARNING(
241 "Insufficient space in message footer for message '%s'",
245 memcpy(aRelayTarget
, buffer
.begin(), sizeof(NodeName
));
246 event
= Event::Deserialize(buffer
.begin() + sizeof(NodeName
),
247 buffer
.length() - sizeof(NodeName
));
249 event
= Event::Deserialize(buffer
.begin(), buffer
.length());
253 NODECONTROLLER_WARNING("Call to Event::Deserialize for message '%s' Failed",
258 if (event
->type() == Event::kUserMessage
) {
259 static_cast<UserMessageEvent
*>(event
.get())
260 ->AttachMessage(std::move(aMessage
));
265 already_AddRefed
<NodeChannel
> NodeController::GetNodeChannel(
266 const NodeName
& aName
) {
267 auto state
= mState
.Lock();
268 return do_AddRef(state
->mPeers
.Get(aName
));
271 void NodeController::DropPeer(NodeName aNodeName
) {
274 #ifdef FUZZING_SNAPSHOT
275 MOZ_FUZZING_IPC_DROP_PEER("NodeController::DropPeer");
279 RefPtr
<NodeChannel
> channel
;
280 nsTArray
<PortRef
> pendingMerges
;
282 auto state
= mState
.Lock();
283 state
->mPeers
.Remove(aNodeName
, &channel
);
284 state
->mPendingMessages
.Remove(aNodeName
);
285 state
->mInvites
.Remove(aNodeName
, &invite
);
286 state
->mPendingMerges
.Remove(aNodeName
, &pendingMerges
);
289 NODECONTROLLER_LOG(LogLevel::Info
, "Dropping Peer %s (pid: %" PRIPID
")",
290 ToString(aNodeName
).c_str(),
291 channel
? channel
->OtherPid() : base::kInvalidProcessId
);
296 if (invite
.mChannel
) {
297 invite
.mChannel
->Close();
299 if (invite
.mToMerge
.is_valid()) {
300 // Ignore any possible errors here.
301 (void)mNode
->ClosePort(invite
.mToMerge
);
303 for (auto& port
: pendingMerges
) {
304 // Ignore any possible errors here.
305 (void)mNode
->ClosePort(port
);
307 mNode
->LostConnectionToNode(aNodeName
);
310 void NodeController::ForwardEvent(const NodeName
& aNode
,
311 UniquePtr
<Event
> aEvent
) {
312 if (aNode
== mName
) {
313 (void)mNode
->AcceptEvent(mName
, std::move(aEvent
));
315 // On Windows and macOS, messages holding HANDLEs or mach ports must be
316 // relayed via the broker process so it can transfer ownership.
317 bool needsRelay
= false;
318 #if defined(XP_WIN) || defined(XP_MACOSX)
319 if (!IsBroker() && aNode
!= kBrokerNodeName
&&
320 aEvent
->type() == Event::kUserMessage
) {
321 auto* userEvent
= static_cast<UserMessageEvent
*>(aEvent
.get());
323 userEvent
->HasMessage() &&
324 userEvent
->GetMessage
<IPC::Message
>()->num_relayed_attachments() > 0;
328 UniquePtr
<IPC::Message
> message
=
329 SerializeEventMessage(std::move(aEvent
), needsRelay
? &aNode
: nullptr);
330 MOZ_ASSERT(message
->is_relay() == needsRelay
,
331 "Message relay status set incorrectly");
333 RefPtr
<NodeChannel
> peer
;
334 RefPtr
<NodeChannel
> broker
;
335 bool needsIntroduction
= false;
337 auto state
= mState
.Lock();
339 // Check if we know this peer. If we don't, we'll need to request an
341 peer
= state
->mPeers
.Get(aNode
);
342 if (!peer
|| needsRelay
) {
344 NODECONTROLLER_WARNING("Ignoring message '%s' to unknown peer %s",
345 message
->name(), ToString(aNode
).c_str());
349 broker
= state
->mPeers
.Get(kBrokerNodeName
);
351 NODECONTROLLER_WARNING(
352 "Ignoring message '%s' to peer %s due to a missing broker",
353 message
->name(), ToString(aNode
).c_str());
359 state
->mPendingMessages
.LookupOrInsertWith(aNode
, [&]() {
360 needsIntroduction
= true;
361 return Queue
<UniquePtr
<IPC::Message
>, 64>{};
363 queue
.Push(std::move(message
));
368 MOZ_ASSERT(!needsIntroduction
|| !needsRelay
,
369 "Only one of the two should ever be set");
372 NODECONTROLLER_LOG(LogLevel::Info
,
373 "Relaying message '%s' for peer %s due to %" PRIu32
375 message
->name(), ToString(aNode
).c_str(),
376 message
->num_relayed_attachments());
377 MOZ_ASSERT(message
->num_relayed_attachments() > 0 && broker
);
378 broker
->SendEventMessage(std::move(message
));
379 } else if (needsIntroduction
) {
381 broker
->RequestIntroduction(aNode
);
383 peer
->SendEventMessage(std::move(message
));
388 void NodeController::BroadcastEvent(UniquePtr
<Event
> aEvent
) {
389 UniquePtr
<IPC::Message
> message
=
390 SerializeEventMessage(std::move(aEvent
), nullptr, BROADCAST_MESSAGE_TYPE
);
393 OnBroadcast(mName
, std::move(message
));
394 } else if (RefPtr
<NodeChannel
> broker
= GetNodeChannel(kBrokerNodeName
)) {
395 broker
->Broadcast(std::move(message
));
397 NODECONTROLLER_WARNING(
398 "Trying to broadcast event, but no connection to broker");
402 void NodeController::PortStatusChanged(const PortRef
& aPortRef
) {
403 RefPtr
<UserData
> userData
;
404 int rv
= mNode
->GetUserData(aPortRef
, &userData
);
405 if (rv
!= mojo::core::ports::OK
) {
406 NODECONTROLLER_WARNING("GetUserData call for port '%s' failed",
407 ToString(aPortRef
.name()).c_str());
411 // All instances of `UserData` attached to ports in this node must be of
412 // type `PortObserver`, so we can call `OnPortStatusChanged` directly on
414 static_cast<PortObserver
*>(userData
.get())->OnPortStatusChanged();
418 void NodeController::OnEventMessage(const NodeName
& aFromNode
,
419 UniquePtr
<IPC::Message
> aMessage
) {
422 bool isRelay
= aMessage
->is_relay();
423 if (isRelay
&& aMessage
->num_relayed_attachments() == 0) {
424 NODECONTROLLER_WARNING(
425 "Invalid relay message without relayed attachments from peer %s!",
426 ToString(aFromNode
).c_str());
431 NodeName relayTarget
;
432 UniquePtr
<Event
> event
= DeserializeEventMessage(
433 std::move(aMessage
), isRelay
? &relayTarget
: nullptr);
435 NODECONTROLLER_WARNING("Invalid EventMessage from peer %s!",
436 ToString(aFromNode
).c_str());
441 NodeName fromNode
= aFromNode
;
442 #if defined(XP_WIN) || defined(XP_MACOSX)
444 if (event
->type() != Event::kUserMessage
) {
445 NODECONTROLLER_WARNING(
446 "Unexpected relay of non-UserMessage event from peer %s!",
447 ToString(aFromNode
).c_str());
452 // If we're the broker, then we'll need to forward this message on to the
453 // true recipient. To do this, we re-serialize the message, passing along
454 // the original source node, and send it to the final node.
456 UniquePtr
<IPC::Message
> message
=
457 SerializeEventMessage(std::move(event
), &aFromNode
);
459 NODECONTROLLER_WARNING(
460 "Relaying EventMessage from peer %s failed to re-serialize!",
461 ToString(aFromNode
).c_str());
465 MOZ_ASSERT(message
->is_relay(), "Message stopped being a relay message?");
466 MOZ_ASSERT(message
->num_relayed_attachments() > 0,
467 "Message doesn't have relayed attachments?");
471 "Relaying message '%s' from peer %s to peer %s (%" PRIu32
473 message
->name(), ToString(aFromNode
).c_str(),
474 ToString(relayTarget
).c_str(), message
->num_relayed_attachments());
476 RefPtr
<NodeChannel
> peer
;
478 auto state
= mState
.Lock();
479 peer
= state
->mPeers
.Get(relayTarget
);
482 NODECONTROLLER_WARNING(
483 "Dropping relayed message from %s to unknown peer %s",
484 ToString(aFromNode
).c_str(), ToString(relayTarget
).c_str());
488 peer
->SendEventMessage(std::move(message
));
492 // Otherwise, we're the final recipient, so we can continue & process the
494 if (aFromNode
!= kBrokerNodeName
) {
495 NODECONTROLLER_WARNING(
496 "Unexpected relayed EventMessage from non-broker peer %s!",
497 ToString(aFromNode
).c_str());
501 fromNode
= relayTarget
;
503 NODECONTROLLER_LOG(LogLevel::Info
, "Got relayed message from peer %s",
504 ToString(fromNode
).c_str());
508 // If we're getting a requested port merge from another process, check to make
509 // sure that we're expecting the request, and record that the merge has
510 // arrived so we don't try to close the port on error.
511 if (event
->type() == Event::kMergePort
) {
512 // Check that the target port for the merge actually exists.
513 auto targetPort
= GetPort(event
->port_name());
514 if (!targetPort
.is_valid()) {
515 NODECONTROLLER_WARNING(
516 "Unexpected MergePortEvent from peer %s for unknown port %s",
517 ToString(fromNode
).c_str(), ToString(event
->port_name()).c_str());
522 // Check if `targetPort` is in our pending merges entry for the given source
523 // node. If this makes the `mPendingMerges` entry empty, remove it.
524 bool expectingMerge
= [&] {
525 auto state
= mState
.Lock();
526 auto pendingMerges
= state
->mPendingMerges
.Lookup(aFromNode
);
527 if (!pendingMerges
) {
530 size_t removed
= pendingMerges
->RemoveElementsBy(
531 [&](auto& port
) { return port
.name() == targetPort
.name(); });
532 if (removed
!= 0 && pendingMerges
->IsEmpty()) {
533 pendingMerges
.Remove();
538 if (!expectingMerge
) {
539 NODECONTROLLER_WARNING(
540 "Unexpected MergePortEvent from peer %s for port %s",
541 ToString(fromNode
).c_str(), ToString(event
->port_name()).c_str());
547 (void)mNode
->AcceptEvent(fromNode
, std::move(event
));
550 void NodeController::OnBroadcast(const NodeName
& aFromNode
,
551 UniquePtr
<IPC::Message
> aMessage
) {
552 MOZ_DIAGNOSTIC_ASSERT(aMessage
->type() == BROADCAST_MESSAGE_TYPE
);
554 // NOTE: This method may be called off of the IO thread by the
555 // `BroadcastEvent` node callback.
557 NODECONTROLLER_WARNING("Broadcast request received by non-broker node");
561 UniquePtr
<Event
> event
= DeserializeEventMessage(std::move(aMessage
));
563 NODECONTROLLER_WARNING("Invalid broadcast message from peer");
567 nsTArray
<RefPtr
<NodeChannel
>> peers
;
569 auto state
= mState
.Lock();
570 peers
.SetCapacity(state
->mPeers
.Count());
571 for (const auto& peer
: state
->mPeers
.Values()) {
572 peers
.AppendElement(peer
);
575 for (const auto& peer
: peers
) {
576 // NOTE: This `clone` operation is only supported for a limited number of
577 // message types by the ports API, which provides some extra security by
578 // only allowing those specific types of messages to be broadcasted.
579 // Messages which don't support `CloneForBroadcast` cannot be broadcast, and
580 // the ports library will not attempt to broadcast them.
581 auto clone
= event
->CloneForBroadcast();
583 NODECONTROLLER_WARNING("Attempt to broadcast unsupported message");
587 peer
->SendEventMessage(SerializeEventMessage(std::move(clone
)));
591 void NodeController::OnIntroduce(const NodeName
& aFromNode
,
592 NodeChannel::Introduction aIntroduction
) {
595 if (aFromNode
!= kBrokerNodeName
) {
596 NODECONTROLLER_WARNING("Introduction received from non-broker node");
601 MOZ_ASSERT(aIntroduction
.mMyPid
== base::GetCurrentProcId(),
602 "We're the wrong process to receive this?");
604 if (!aIntroduction
.mHandle
) {
605 NODECONTROLLER_WARNING("Could not be introduced to peer %s",
606 ToString(aIntroduction
.mName
).c_str());
607 mNode
->LostConnectionToNode(aIntroduction
.mName
);
609 auto state
= mState
.Lock();
610 state
->mPendingMessages
.Remove(aIntroduction
.mName
);
615 MakeUnique
<IPC::Channel
>(std::move(aIntroduction
.mHandle
),
616 aIntroduction
.mMode
, aIntroduction
.mOtherPid
);
617 auto nodeChannel
= MakeRefPtr
<NodeChannel
>(
618 aIntroduction
.mName
, std::move(channel
), this, aIntroduction
.mOtherPid
);
621 auto state
= mState
.Lock();
623 state
->mPeers
.LookupOrInsertWith(aIntroduction
.mName
, [&]() {
628 // We got a duplicate introduction. This can happen during normal
629 // execution if both sides request an introduction at the same time. We
630 // can just ignore the second one, as they'll arrive in the same order in
632 nodeChannel
->Close();
636 // Deliver any pending messages, then remove the entry from our table. We do
637 // this while `mState` is still held to ensure that these messages are
638 // all sent before another thread can observe the newly created channel.
639 // As the channel hasn't been `Connect()`-ed yet, this will only queue the
640 // messages up to be sent, so is OK to do with the mutex held. These
641 // messages will be processed to be sent during `Start()` below, which is
642 // performed outside of the lock.
643 if (auto pending
= state
->mPendingMessages
.Lookup(aIntroduction
.mName
)) {
644 while (!pending
->IsEmpty()) {
645 nodeChannel
->SendEventMessage(pending
->Pop());
651 // NodeChannel::Start must be called with the lock not held, as it may lead to
652 // callbacks being made into `OnChannelError` or `OnMessageReceived`, which
653 // will attempt to re-acquire our lock.
654 nodeChannel
->Start();
657 void NodeController::OnRequestIntroduction(const NodeName
& aFromNode
,
658 const NodeName
& aName
) {
660 if (NS_WARN_IF(!IsBroker())) {
664 RefPtr
<NodeChannel
> peerA
= GetNodeChannel(aFromNode
);
665 if (!peerA
|| aName
== mojo::core::ports::kInvalidNodeName
) {
666 NODECONTROLLER_WARNING("Invalid OnRequestIntroduction message from node %s",
667 ToString(aFromNode
).c_str());
672 RefPtr
<NodeChannel
> peerB
= GetNodeChannel(aName
);
673 IPC::Channel::ChannelHandle handleA
, handleB
;
674 if (!peerB
|| !IPC::Channel::CreateRawPipe(&handleA
, &handleB
)) {
675 NODECONTROLLER_WARNING(
676 "Rejecting introduction request from '%s' for unknown peer '%s'",
677 ToString(aFromNode
).c_str(), ToString(aName
).c_str());
679 // We don't know this peer, or ran into issues creating the descriptor! Send
680 // an invalid introduction to content to clean up any pending outbound
682 NodeChannel::Introduction intro
{aName
, nullptr, IPC::Channel::MODE_SERVER
,
683 peerA
->OtherPid(), base::kInvalidProcessId
};
684 peerA
->Introduce(std::move(intro
));
688 NodeChannel::Introduction introA
{aName
, std::move(handleA
),
689 IPC::Channel::MODE_SERVER
, peerA
->OtherPid(),
691 NodeChannel::Introduction introB
{aFromNode
, std::move(handleB
),
692 IPC::Channel::MODE_CLIENT
, peerB
->OtherPid(),
694 peerA
->Introduce(std::move(introA
));
695 peerB
->Introduce(std::move(introB
));
698 void NodeController::OnAcceptInvite(const NodeName
& aFromNode
,
699 const NodeName
& aRealName
,
700 const PortName
& aInitialPort
) {
703 NODECONTROLLER_WARNING("Ignoring AcceptInvite message as non-broker");
707 if (aRealName
== mojo::core::ports::kInvalidNodeName
||
708 aInitialPort
== mojo::core::ports::kInvalidPortName
) {
709 NODECONTROLLER_WARNING("Invalid name in AcceptInvite message");
714 bool inserted
= false;
717 auto state
= mState
.Lock();
718 MOZ_ASSERT(state
->mPendingMessages
.IsEmpty(),
719 "Shouldn't have pending messages in broker");
721 // Try to remove the source node from our invites list and insert it into
722 // our peers map under the new name.
723 if (state
->mInvites
.Remove(aFromNode
, &invite
)) {
724 MOZ_ASSERT(invite
.mChannel
&& invite
.mToMerge
.is_valid());
725 state
->mPeers
.LookupOrInsertWith(aRealName
, [&]() {
727 return invite
.mChannel
;
732 NODECONTROLLER_WARNING("Invalid AcceptInvite message from node %s",
733 ToString(aFromNode
).c_str());
738 // Update the name of the node. This field is only accessed from the IO
739 // thread, so it's safe to update it without a lock held.
740 invite
.mChannel
->SetName(aRealName
);
742 // Start the port merge to allow our existing initial port to begin
743 // communicating with the remote port.
744 PORTS_ALWAYS_OK(mNode
->MergePorts(invite
.mToMerge
, aRealName
, aInitialPort
));
747 void NodeController::OnChannelError(const NodeName
& aFromNode
) {
752 static mojo::core::ports::NodeName
RandomNodeName() {
753 return {RandomUint64OrDie(), RandomUint64OrDie()};
756 std::tuple
<ScopedPort
, RefPtr
<NodeChannel
>> NodeController::InviteChildProcess(
757 UniquePtr
<IPC::Channel
> aChannel
,
758 GeckoChildProcessHost
* aChildProcessHost
) {
759 MOZ_ASSERT(IsBroker());
762 // Create the peer with a randomly generated name, and store it in `mInvites`.
763 // This channel and name will be used for communication with the node until it
764 // sends us its' real name in an `AcceptInvite` message.
765 auto ports
= CreatePortPair();
766 auto inviteName
= RandomNodeName();
768 MakeRefPtr
<NodeChannel
>(inviteName
, std::move(aChannel
), this,
769 base::kInvalidProcessId
, aChildProcessHost
);
771 auto state
= mState
.Lock();
772 MOZ_DIAGNOSTIC_ASSERT(!state
->mPeers
.Contains(inviteName
),
774 MOZ_DIAGNOSTIC_ASSERT(!state
->mInvites
.Contains(inviteName
),
776 state
->mInvites
.InsertOrUpdate(inviteName
,
777 Invite
{nodeChannel
, ports
.second
.Release()});
780 nodeChannel
->Start();
781 return std::tuple
{std::move(ports
.first
), std::move(nodeChannel
)};
784 void NodeController::InitBrokerProcess() {
786 MOZ_ASSERT(!gNodeController
);
787 gNodeController
= new NodeController(kBrokerNodeName
);
790 ScopedPort
NodeController::InitChildProcess(UniquePtr
<IPC::Channel
> aChannel
,
791 base::ProcessId aParentPid
) {
793 MOZ_ASSERT(!gNodeController
);
795 auto nodeName
= RandomNodeName();
796 gNodeController
= new NodeController(nodeName
);
798 auto ports
= gNodeController
->CreatePortPair();
799 PortRef toMerge
= ports
.second
.Release();
801 // Mark the port as expecting a pending merge. This is a duplicate of the
802 // information tracked by `mPendingMerges`, and was added by upstream
804 // See https://chromium-review.googlesource.com/c/chromium/src/+/3289065
806 mojo::core::ports::SinglePortLocker
locker(&toMerge
);
807 locker
.port()->pending_merge_peer
= true;
810 auto nodeChannel
= MakeRefPtr
<NodeChannel
>(
811 kBrokerNodeName
, std::move(aChannel
), gNodeController
, aParentPid
);
813 auto state
= gNodeController
->mState
.Lock();
814 MOZ_DIAGNOSTIC_ASSERT(!state
->mPeers
.Contains(kBrokerNodeName
));
815 state
->mPeers
.InsertOrUpdate(kBrokerNodeName
, nodeChannel
);
816 MOZ_DIAGNOSTIC_ASSERT(!state
->mPendingMerges
.Contains(kBrokerNodeName
));
817 state
->mPendingMerges
.LookupOrInsert(kBrokerNodeName
)
818 .AppendElement(toMerge
);
821 nodeChannel
->Start();
822 nodeChannel
->AcceptInvite(nodeName
, toMerge
.name());
823 return std::move(ports
.first
);
826 void NodeController::CleanUp() {
828 MOZ_ASSERT(gNodeController
);
830 RefPtr
<NodeController
> nodeController
= gNodeController
;
831 gNodeController
= nullptr;
833 // Collect all objects from our state which need to be cleaned up.
834 nsTArray
<NodeName
> lostConnections
;
835 nsTArray
<RefPtr
<NodeChannel
>> channelsToClose
;
836 nsTArray
<PortRef
> portsToClose
;
838 auto state
= nodeController
->mState
.Lock();
839 for (const auto& chan
: state
->mPeers
) {
840 lostConnections
.AppendElement(chan
.GetKey());
841 channelsToClose
.AppendElement(chan
.GetData());
843 for (const auto& invite
: state
->mInvites
.Values()) {
844 channelsToClose
.AppendElement(invite
.mChannel
);
845 portsToClose
.AppendElement(invite
.mToMerge
);
847 for (const auto& pendingPorts
: state
->mPendingMerges
.Values()) {
848 portsToClose
.AppendElements(pendingPorts
);
850 state
->mPeers
.Clear();
851 state
->mPendingMessages
.Clear();
852 state
->mInvites
.Clear();
853 state
->mPendingMerges
.Clear();
855 for (auto& nodeChannel
: channelsToClose
) {
856 nodeChannel
->Close();
858 for (auto& port
: portsToClose
) {
859 nodeController
->mNode
->ClosePort(port
);
861 for (auto& name
: lostConnections
) {
862 nodeController
->mNode
->LostConnectionToNode(name
);
866 #undef NODECONTROLLER_LOG
867 #undef NODECONTROLLER_WARNING
869 } // namespace mozilla::ipc