Bug 1916262 - Remove AbortFollower inheritance from AbortSignal r=smaug
[gecko.git] / ipc / glue / NodeController.cpp
blobd3827adedbb3c64986504dee87f336c0e2dcaf16
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
5 * You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/ipc/NodeController.h"
8 #include "MainThreadUtils.h"
9 #include "base/process_util.h"
10 #include "chrome/common/ipc_message.h"
11 #include "mojo/core/ports/name.h"
12 #include "mojo/core/ports/node.h"
13 #include "mojo/core/ports/port_locker.h"
14 #include "mozilla/AlreadyAddRefed.h"
15 #include "mozilla/RandomNum.h"
16 #include "mozilla/StaticPtr.h"
17 #include "mozilla/Assertions.h"
18 #include "mozilla/ToString.h"
19 #include "mozilla/ipc/BrowserProcessSubThread.h"
20 #include "mozilla/ipc/ProtocolUtils.h"
21 #include "mozilla/mozalloc.h"
22 #include "nsISerialEventTarget.h"
23 #include "nsTArray.h"
24 #include "nsXULAppAPI.h"
25 #include "nsPrintfCString.h"
27 #define PORTS_ALWAYS_OK(expr) MOZ_ALWAYS_TRUE(mojo::core::ports::OK == (expr))
29 namespace mozilla::ipc {
31 static StaticRefPtr<NodeController> gNodeController;
33 static LazyLogModule gNodeControllerLog{"NodeController"};
35 // Helper logger macro which includes the name of the `this` NodeController in
36 // the logged messages.
37 #define NODECONTROLLER_LOG(level_, fmt_, ...) \
38 MOZ_LOG(gNodeControllerLog, level_, \
39 ("[%s]: " fmt_, ToString(mName).c_str(), ##__VA_ARGS__))
41 // Helper warning macro which both does logger logging and emits NS_WARNING logs
42 // under debug mode.
43 #ifdef DEBUG
44 # define NODECONTROLLER_WARNING(fmt_, ...) \
45 do { \
46 nsPrintfCString warning("[%s]: " fmt_, ToString(mName).c_str(), \
47 ##__VA_ARGS__); \
48 NS_WARNING(warning.get()); \
49 MOZ_LOG(gNodeControllerLog, LogLevel::Debug, ("%s", warning.get())); \
50 } while (0)
51 #else
52 # define NODECONTROLLER_WARNING(fmt_, ...) \
53 NODECONTROLLER_LOG(LogLevel::Warning, fmt_, ##__VA_ARGS__)
54 #endif
56 NodeController::NodeController(const NodeName& aName)
57 : mName(aName), mNode(MakeUnique<Node>(aName, this)) {}
59 NodeController::~NodeController() {
60 auto state = mState.Lock();
61 MOZ_RELEASE_ASSERT(state->mPeers.IsEmpty(),
62 "Destroying NodeController before closing all peers");
63 MOZ_RELEASE_ASSERT(state->mInvites.IsEmpty(),
64 "Destroying NodeController before closing all invites");
67 // FIXME: Actually provide some way to create the thing.
68 /* static */ NodeController* NodeController::GetSingleton() {
69 MOZ_ASSERT(gNodeController);
70 return gNodeController;
73 std::pair<ScopedPort, ScopedPort> NodeController::CreatePortPair() {
74 PortRef port0, port1;
75 PORTS_ALWAYS_OK(mNode->CreatePortPair(&port0, &port1));
76 return {ScopedPort{std::move(port0), this},
77 ScopedPort{std::move(port1), this}};
80 auto NodeController::GetPort(const PortName& aName) -> PortRef {
81 PortRef port;
82 int rv = mNode->GetPort(aName, &port);
83 if (NS_WARN_IF(rv != mojo::core::ports::OK)) {
84 NODECONTROLLER_WARNING("Call to GetPort(%s) Failed",
85 ToString(aName).c_str());
86 return {};
88 return port;
91 void NodeController::SetPortObserver(const PortRef& aPort,
92 PortObserver* aObserver) {
93 PORTS_ALWAYS_OK(mNode->SetUserData(aPort, aObserver));
96 auto NodeController::GetStatus(const PortRef& aPort) -> Maybe<PortStatus> {
97 PortStatus status{};
98 int rv = mNode->GetStatus(aPort, &status);
99 if (rv != mojo::core::ports::OK) {
100 return Nothing();
102 return Some(status);
105 void NodeController::ClosePort(const PortRef& aPort) {
106 PORTS_ALWAYS_OK(mNode->ClosePort(aPort));
109 bool NodeController::GetMessage(const PortRef& aPort,
110 UniquePtr<IPC::Message>* aMessage) {
111 UniquePtr<UserMessageEvent> messageEvent;
112 int rv = mNode->GetMessage(aPort, &messageEvent, nullptr);
113 if (rv != mojo::core::ports::OK) {
114 if (rv == mojo::core::ports::ERROR_PORT_PEER_CLOSED) {
115 return false;
117 MOZ_CRASH("GetMessage on port in invalid state");
120 if (messageEvent) {
121 UniquePtr<IPC::Message> message = messageEvent->TakeMessage<IPC::Message>();
123 // If our UserMessageEvent has any ports directly attached to it, fetch them
124 // from our node and attach them to the IPC::Message we extracted.
126 // It's important to only do this if we have nonempty set of ports on the
127 // event, as we may have never serialized our IPC::Message's ports onto the
128 // event if it was routed in-process.
129 if (messageEvent->num_ports() > 0) {
130 nsTArray<ScopedPort> attachedPorts(messageEvent->num_ports());
131 for (size_t i = 0; i < messageEvent->num_ports(); ++i) {
132 attachedPorts.AppendElement(
133 ScopedPort{GetPort(messageEvent->ports()[i]), this});
135 message->SetAttachedPorts(std::move(attachedPorts));
138 *aMessage = std::move(message);
139 } else {
140 *aMessage = nullptr;
142 return true;
145 bool NodeController::SendUserMessage(const PortRef& aPort,
146 UniquePtr<IPC::Message> aMessage) {
147 auto messageEvent = MakeUnique<UserMessageEvent>(0);
148 messageEvent->AttachMessage(std::move(aMessage));
150 int rv = mNode->SendUserMessage(aPort, std::move(messageEvent));
151 if (rv == mojo::core::ports::OK) {
152 return true;
154 if (rv == mojo::core::ports::ERROR_PORT_PEER_CLOSED) {
155 NODECONTROLLER_LOG(LogLevel::Debug,
156 "Ignoring message to port %s as peer was closed",
157 ToString(aPort.name()).c_str());
158 return true;
160 NODECONTROLLER_WARNING("Failed to send message to port %s",
161 ToString(aPort.name()).c_str());
162 return false;
165 auto NodeController::SerializeEventMessage(
166 UniquePtr<Event> aEvent, const NodeName* aRelayTarget,
167 uint32_t aType) -> UniquePtr<IPC::Message> {
168 UniquePtr<IPC::Message> message;
169 if (aEvent->type() == Event::kUserMessage) {
170 MOZ_DIAGNOSTIC_ASSERT(
171 aType == EVENT_MESSAGE_TYPE,
172 "Can only send a UserMessage in an EVENT_MESSAGE_TYPE");
173 message = static_cast<UserMessageEvent*>(aEvent.get())
174 ->TakeMessage<IPC::Message>();
175 } else {
176 message = MakeUnique<IPC::Message>(MSG_ROUTING_CONTROL, aType);
179 message->set_relay(aRelayTarget != nullptr);
181 size_t length = aEvent->GetSerializedSize();
182 if (aRelayTarget) {
183 length += sizeof(NodeName);
186 // Use an intermediate buffer to serialize to avoid potential issues with the
187 // segmented `IPC::Message` bufferlist. This should be fairly cheap, as the
188 // majority of events are fairly small.
189 Vector<char, 256, InfallibleAllocPolicy> buffer;
190 (void)buffer.initLengthUninitialized(length);
191 if (aRelayTarget) {
192 memcpy(buffer.begin(), aRelayTarget, sizeof(NodeName));
193 aEvent->Serialize(buffer.begin() + sizeof(NodeName));
194 } else {
195 aEvent->Serialize(buffer.begin());
198 message->WriteFooter(buffer.begin(), buffer.length());
199 message->set_event_footer_size(buffer.length());
201 #ifdef DEBUG
202 // Debug-assert that we can read the same data back out of the buffer.
203 MOZ_ASSERT(message->event_footer_size() == length);
204 Vector<char, 256, InfallibleAllocPolicy> buffer2;
205 (void)buffer2.initLengthUninitialized(message->event_footer_size());
206 MOZ_ASSERT(message->ReadFooter(buffer2.begin(), buffer2.length(),
207 /* truncate */ false));
208 MOZ_ASSERT(!memcmp(buffer2.begin(), buffer.begin(), buffer.length()));
209 #endif
211 return message;
214 auto NodeController::DeserializeEventMessage(UniquePtr<IPC::Message> aMessage,
215 NodeName* aRelayTarget)
216 -> UniquePtr<Event> {
217 if (aMessage->is_relay() && !aRelayTarget) {
218 NODECONTROLLER_WARNING("Unexpected relay message '%s'", aMessage->name());
219 return nullptr;
222 Vector<char, 256, InfallibleAllocPolicy> buffer;
223 (void)buffer.initLengthUninitialized(aMessage->event_footer_size());
224 // Truncate the message when reading the footer, so that the extra footer data
225 // is no longer present in the message. This allows future code to eventually
226 // send the same `IPC::Message` to another process.
227 if (!aMessage->ReadFooter(buffer.begin(), buffer.length(),
228 /* truncate */ true)) {
229 NODECONTROLLER_WARNING("Call to ReadFooter for message '%s' Failed",
230 aMessage->name());
231 return nullptr;
233 aMessage->set_event_footer_size(0);
235 UniquePtr<Event> event;
236 if (aRelayTarget) {
237 MOZ_ASSERT(aMessage->is_relay());
238 if (buffer.length() < sizeof(NodeName)) {
239 NODECONTROLLER_WARNING(
240 "Insufficient space in message footer for message '%s'",
241 aMessage->name());
242 return nullptr;
244 memcpy(aRelayTarget, buffer.begin(), sizeof(NodeName));
245 event = Event::Deserialize(buffer.begin() + sizeof(NodeName),
246 buffer.length() - sizeof(NodeName));
247 } else {
248 event = Event::Deserialize(buffer.begin(), buffer.length());
251 if (!event) {
252 NODECONTROLLER_WARNING("Call to Event::Deserialize for message '%s' Failed",
253 aMessage->name());
254 return nullptr;
257 if (event->type() == Event::kUserMessage) {
258 static_cast<UserMessageEvent*>(event.get())
259 ->AttachMessage(std::move(aMessage));
261 return event;
264 already_AddRefed<NodeChannel> NodeController::GetNodeChannel(
265 const NodeName& aName) {
266 auto state = mState.Lock();
267 return do_AddRef(state->mPeers.Get(aName));
270 void NodeController::DropPeer(NodeName aNodeName) {
271 AssertIOThread();
273 #ifdef FUZZING_SNAPSHOT
274 MOZ_FUZZING_IPC_DROP_PEER("NodeController::DropPeer");
275 #endif
277 Invite invite;
278 RefPtr<NodeChannel> channel;
279 nsTArray<PortRef> pendingMerges;
281 auto state = mState.Lock();
282 state->mPeers.Remove(aNodeName, &channel);
283 state->mPendingMessages.Remove(aNodeName);
284 state->mInvites.Remove(aNodeName, &invite);
285 state->mPendingMerges.Remove(aNodeName, &pendingMerges);
288 NODECONTROLLER_LOG(LogLevel::Info, "Dropping Peer %s (pid: %" PRIPID ")",
289 ToString(aNodeName).c_str(),
290 channel ? channel->OtherPid() : base::kInvalidProcessId);
292 if (channel) {
293 channel->Close();
295 if (invite.mChannel) {
296 invite.mChannel->Close();
298 if (invite.mToMerge.is_valid()) {
299 // Ignore any possible errors here.
300 (void)mNode->ClosePort(invite.mToMerge);
302 for (auto& port : pendingMerges) {
303 // Ignore any possible errors here.
304 (void)mNode->ClosePort(port);
306 mNode->LostConnectionToNode(aNodeName);
309 void NodeController::ContactRemotePeer(const NodeName& aNode,
310 UniquePtr<Event> aEvent) {
311 // On Windows and macOS, messages holding HANDLEs or mach ports must be
312 // relayed via the broker process so it can transfer ownership.
313 bool needsRelay = false;
314 #if defined(XP_WIN) || defined(XP_DARWIN)
315 if (aEvent && !IsBroker() && aNode != kBrokerNodeName &&
316 aEvent->type() == Event::kUserMessage) {
317 auto* userEvent = static_cast<UserMessageEvent*>(aEvent.get());
318 needsRelay =
319 userEvent->HasMessage() &&
320 userEvent->GetMessage<IPC::Message>()->num_relayed_attachments() > 0;
322 #endif
324 UniquePtr<IPC::Message> message;
325 if (aEvent) {
326 message =
327 SerializeEventMessage(std::move(aEvent), needsRelay ? &aNode : nullptr);
328 MOZ_ASSERT(message->is_relay() == needsRelay,
329 "Message relay status set incorrectly");
332 RefPtr<NodeChannel> peer;
333 RefPtr<NodeChannel> broker;
334 bool needsIntroduction = false;
335 bool needsBroker = needsRelay;
337 auto state = mState.Lock();
339 // Check if we know this peer. If we don't, we'll need to request an
340 // introduction.
341 peer = state->mPeers.Get(aNode);
342 if (!peer) {
343 // We don't know the peer, check if we've already requested an
344 // introduction, or if we need to request a new one.
345 auto& queue = state->mPendingMessages.LookupOrInsertWith(aNode, [&]() {
346 needsIntroduction = true;
347 needsBroker = true;
348 return Queue<UniquePtr<IPC::Message>, 64>{};
350 // If we aren't relaying, queue up the message to be sent.
351 if (message && !needsRelay) {
352 queue.Push(std::move(message));
356 if (needsBroker && !IsBroker()) {
357 broker = state->mPeers.Get(kBrokerNodeName);
361 if (needsBroker && !broker) {
362 NODECONTROLLER_WARNING(
363 "Dropping message '%s'; no connection to unknown peer %s",
364 message ? message->name() : "<null>", ToString(aNode).c_str());
365 if (needsIntroduction) {
366 // We have no broker and will never be able to be introduced to this node.
367 // Queue a task to clean up any ports connected to it.
368 XRE_GetIOMessageLoop()->PostTask(NewRunnableMethod<NodeName>(
369 "NodeController::DropPeer", this, &NodeController::DropPeer, aNode));
371 return;
374 if (needsIntroduction) {
375 NODECONTROLLER_LOG(LogLevel::Info, "Requesting introduction to peer %s",
376 ToString(aNode).c_str());
377 broker->RequestIntroduction(aNode);
380 if (message) {
381 if (needsRelay) {
382 NODECONTROLLER_LOG(LogLevel::Info,
383 "Relaying message '%s' for peer %s due to %" PRIu32
384 " attachments",
385 message->name(), ToString(aNode).c_str(),
386 message->num_relayed_attachments());
387 MOZ_ASSERT(message->num_relayed_attachments() > 0 && broker);
388 broker->SendEventMessage(std::move(message));
389 } else if (peer) {
390 peer->SendEventMessage(std::move(message));
395 void NodeController::ForwardEvent(const NodeName& aNode,
396 UniquePtr<Event> aEvent) {
397 MOZ_ASSERT(aEvent, "cannot forward null event");
398 if (aNode == mName) {
399 (void)mNode->AcceptEvent(mName, std::move(aEvent));
400 } else {
401 ContactRemotePeer(aNode, std::move(aEvent));
405 void NodeController::BroadcastEvent(UniquePtr<Event> aEvent) {
406 UniquePtr<IPC::Message> message =
407 SerializeEventMessage(std::move(aEvent), nullptr, BROADCAST_MESSAGE_TYPE);
409 if (IsBroker()) {
410 OnBroadcast(mName, std::move(message));
411 } else if (RefPtr<NodeChannel> broker = GetNodeChannel(kBrokerNodeName)) {
412 broker->Broadcast(std::move(message));
413 } else {
414 NODECONTROLLER_WARNING(
415 "Trying to broadcast event, but no connection to broker");
419 void NodeController::PortStatusChanged(const PortRef& aPortRef) {
420 RefPtr<UserData> userData;
421 int rv = mNode->GetUserData(aPortRef, &userData);
422 if (rv != mojo::core::ports::OK) {
423 NODECONTROLLER_WARNING("GetUserData call for port '%s' failed",
424 ToString(aPortRef.name()).c_str());
425 return;
427 if (userData) {
428 // All instances of `UserData` attached to ports in this node must be of
429 // type `PortObserver`, so we can call `OnPortStatusChanged` directly on
430 // them.
431 static_cast<PortObserver*>(userData.get())->OnPortStatusChanged();
435 void NodeController::ObserveRemoteNode(const NodeName& aNode) {
436 MOZ_ASSERT(aNode != mName);
437 ContactRemotePeer(aNode, nullptr);
440 void NodeController::OnEventMessage(const NodeName& aFromNode,
441 UniquePtr<IPC::Message> aMessage) {
442 AssertIOThread();
444 bool isRelay = aMessage->is_relay();
445 if (isRelay && aMessage->num_relayed_attachments() == 0) {
446 NODECONTROLLER_WARNING(
447 "Invalid relay message without relayed attachments from peer %s!",
448 ToString(aFromNode).c_str());
449 DropPeer(aFromNode);
450 return;
453 NodeName relayTarget;
454 UniquePtr<Event> event = DeserializeEventMessage(
455 std::move(aMessage), isRelay ? &relayTarget : nullptr);
456 if (!event) {
457 NODECONTROLLER_WARNING("Invalid EventMessage from peer %s!",
458 ToString(aFromNode).c_str());
459 DropPeer(aFromNode);
460 return;
463 NodeName fromNode = aFromNode;
464 #if defined(XP_WIN) || defined(XP_DARWIN)
465 if (isRelay) {
466 if (event->type() != Event::kUserMessage) {
467 NODECONTROLLER_WARNING(
468 "Unexpected relay of non-UserMessage event from peer %s!",
469 ToString(aFromNode).c_str());
470 DropPeer(aFromNode);
471 return;
474 // If we're the broker, then we'll need to forward this message on to the
475 // true recipient. To do this, we re-serialize the message, passing along
476 // the original source node, and send it to the final node.
477 if (IsBroker()) {
478 UniquePtr<IPC::Message> message =
479 SerializeEventMessage(std::move(event), &aFromNode);
480 if (!message) {
481 NODECONTROLLER_WARNING(
482 "Relaying EventMessage from peer %s failed to re-serialize!",
483 ToString(aFromNode).c_str());
484 DropPeer(aFromNode);
485 return;
487 MOZ_ASSERT(message->is_relay(), "Message stopped being a relay message?");
488 MOZ_ASSERT(message->num_relayed_attachments() > 0,
489 "Message doesn't have relayed attachments?");
491 NODECONTROLLER_LOG(
492 LogLevel::Info,
493 "Relaying message '%s' from peer %s to peer %s (%" PRIu32
494 " attachments)",
495 message->name(), ToString(aFromNode).c_str(),
496 ToString(relayTarget).c_str(), message->num_relayed_attachments());
498 RefPtr<NodeChannel> peer;
500 auto state = mState.Lock();
501 peer = state->mPeers.Get(relayTarget);
503 if (!peer) {
504 NODECONTROLLER_WARNING(
505 "Dropping relayed message from %s to unknown peer %s",
506 ToString(aFromNode).c_str(), ToString(relayTarget).c_str());
507 return;
510 peer->SendEventMessage(std::move(message));
511 return;
514 // Otherwise, we're the final recipient, so we can continue & process the
515 // message as usual.
516 if (aFromNode != kBrokerNodeName) {
517 NODECONTROLLER_WARNING(
518 "Unexpected relayed EventMessage from non-broker peer %s!",
519 ToString(aFromNode).c_str());
520 DropPeer(aFromNode);
521 return;
523 fromNode = relayTarget;
525 NODECONTROLLER_LOG(LogLevel::Info, "Got relayed message from peer %s",
526 ToString(fromNode).c_str());
528 #endif
530 // If we're getting a requested port merge from another process, check to make
531 // sure that we're expecting the request, and record that the merge has
532 // arrived so we don't try to close the port on error.
533 if (event->type() == Event::kMergePort) {
534 // Check that the target port for the merge actually exists.
535 auto targetPort = GetPort(event->port_name());
536 if (!targetPort.is_valid()) {
537 NODECONTROLLER_WARNING(
538 "Unexpected MergePortEvent from peer %s for unknown port %s",
539 ToString(fromNode).c_str(), ToString(event->port_name()).c_str());
540 DropPeer(fromNode);
541 return;
544 // Check if `targetPort` is in our pending merges entry for the given source
545 // node. If this makes the `mPendingMerges` entry empty, remove it.
546 bool expectingMerge = [&] {
547 auto state = mState.Lock();
548 auto pendingMerges = state->mPendingMerges.Lookup(aFromNode);
549 if (!pendingMerges) {
550 return false;
552 size_t removed = pendingMerges->RemoveElementsBy(
553 [&](auto& port) { return port.name() == targetPort.name(); });
554 if (removed != 0 && pendingMerges->IsEmpty()) {
555 pendingMerges.Remove();
557 return removed != 0;
558 }();
560 if (!expectingMerge) {
561 NODECONTROLLER_WARNING(
562 "Unexpected MergePortEvent from peer %s for port %s",
563 ToString(fromNode).c_str(), ToString(event->port_name()).c_str());
564 DropPeer(fromNode);
565 return;
569 (void)mNode->AcceptEvent(fromNode, std::move(event));
572 void NodeController::OnBroadcast(const NodeName& aFromNode,
573 UniquePtr<IPC::Message> aMessage) {
574 MOZ_DIAGNOSTIC_ASSERT(aMessage->type() == BROADCAST_MESSAGE_TYPE);
576 // NOTE: This method may be called off of the IO thread by the
577 // `BroadcastEvent` node callback.
578 if (!IsBroker()) {
579 NODECONTROLLER_WARNING("Broadcast request received by non-broker node");
580 return;
583 UniquePtr<Event> event = DeserializeEventMessage(std::move(aMessage));
584 if (!event) {
585 NODECONTROLLER_WARNING("Invalid broadcast message from peer");
586 return;
589 nsTArray<RefPtr<NodeChannel>> peers;
591 auto state = mState.Lock();
592 peers.SetCapacity(state->mPeers.Count());
593 for (const auto& peer : state->mPeers.Values()) {
594 peers.AppendElement(peer);
597 for (const auto& peer : peers) {
598 // NOTE: This `clone` operation is only supported for a limited number of
599 // message types by the ports API, which provides some extra security by
600 // only allowing those specific types of messages to be broadcasted.
601 // Messages which don't support `CloneForBroadcast` cannot be broadcast, and
602 // the ports library will not attempt to broadcast them.
603 auto clone = event->CloneForBroadcast();
604 if (!clone) {
605 NODECONTROLLER_WARNING("Attempt to broadcast unsupported message");
606 break;
609 peer->SendEventMessage(SerializeEventMessage(std::move(clone)));
613 void NodeController::OnIntroduce(const NodeName& aFromNode,
614 NodeChannel::Introduction aIntroduction) {
615 AssertIOThread();
617 if (aFromNode != kBrokerNodeName) {
618 NODECONTROLLER_WARNING("Introduction received from non-broker node");
619 DropPeer(aFromNode);
620 return;
623 MOZ_ASSERT(aIntroduction.mMyPid == base::GetCurrentProcId(),
624 "We're the wrong process to receive this?");
626 if (!aIntroduction.mHandle) {
627 NODECONTROLLER_WARNING("Could not be introduced to peer %s",
628 ToString(aIntroduction.mName).c_str());
629 mNode->LostConnectionToNode(aIntroduction.mName);
631 auto state = mState.Lock();
632 state->mPendingMessages.Remove(aIntroduction.mName);
633 return;
636 auto channel =
637 MakeUnique<IPC::Channel>(std::move(aIntroduction.mHandle),
638 aIntroduction.mMode, aIntroduction.mOtherPid);
639 auto nodeChannel = MakeRefPtr<NodeChannel>(
640 aIntroduction.mName, std::move(channel), this, aIntroduction.mOtherPid);
643 auto state = mState.Lock();
644 bool isNew = false;
645 state->mPeers.LookupOrInsertWith(aIntroduction.mName, [&]() {
646 isNew = true;
647 return nodeChannel;
649 if (!isNew) {
650 // We got a duplicate introduction. This can happen during normal
651 // execution if both sides request an introduction at the same time. We
652 // can just ignore the second one, as they'll arrive in the same order in
653 // both processes.
654 nodeChannel->Close();
655 return;
658 // Deliver any pending messages, then remove the entry from our table. We do
659 // this while `mState` is still held to ensure that these messages are
660 // all sent before another thread can observe the newly created channel.
661 // As the channel hasn't been `Connect()`-ed yet, this will only queue the
662 // messages up to be sent, so is OK to do with the mutex held. These
663 // messages will be processed to be sent during `Start()` below, which is
664 // performed outside of the lock.
665 if (auto pending = state->mPendingMessages.Lookup(aIntroduction.mName)) {
666 while (!pending->IsEmpty()) {
667 nodeChannel->SendEventMessage(pending->Pop());
669 pending.Remove();
673 // NodeChannel::Start must be called with the lock not held, as it may lead to
674 // callbacks being made into `OnChannelError` or `OnMessageReceived`, which
675 // will attempt to re-acquire our lock.
676 nodeChannel->Start();
679 void NodeController::OnRequestIntroduction(const NodeName& aFromNode,
680 const NodeName& aName) {
681 AssertIOThread();
682 if (NS_WARN_IF(!IsBroker())) {
683 return;
686 RefPtr<NodeChannel> peerA = GetNodeChannel(aFromNode);
687 if (!peerA || aName == mojo::core::ports::kInvalidNodeName) {
688 NODECONTROLLER_WARNING("Invalid OnRequestIntroduction message from node %s",
689 ToString(aFromNode).c_str());
690 DropPeer(aFromNode);
691 return;
694 RefPtr<NodeChannel> peerB = GetNodeChannel(aName);
695 IPC::Channel::ChannelHandle handleA, handleB;
696 if (!peerB || !IPC::Channel::CreateRawPipe(&handleA, &handleB)) {
697 NODECONTROLLER_WARNING(
698 "Rejecting introduction request from '%s' for unknown peer '%s'",
699 ToString(aFromNode).c_str(), ToString(aName).c_str());
701 // We don't know this peer, or ran into issues creating the descriptor! Send
702 // an invalid introduction to content to clean up any pending outbound
703 // messages.
704 NodeChannel::Introduction intro{aName, nullptr, IPC::Channel::MODE_SERVER,
705 peerA->OtherPid(), base::kInvalidProcessId};
706 peerA->Introduce(std::move(intro));
707 return;
710 NodeChannel::Introduction introA{aName, std::move(handleA),
711 IPC::Channel::MODE_SERVER, peerA->OtherPid(),
712 peerB->OtherPid()};
713 NodeChannel::Introduction introB{aFromNode, std::move(handleB),
714 IPC::Channel::MODE_CLIENT, peerB->OtherPid(),
715 peerA->OtherPid()};
716 peerA->Introduce(std::move(introA));
717 peerB->Introduce(std::move(introB));
720 void NodeController::OnAcceptInvite(const NodeName& aFromNode,
721 const NodeName& aRealName,
722 const PortName& aInitialPort) {
723 AssertIOThread();
724 if (!IsBroker()) {
725 NODECONTROLLER_WARNING("Ignoring AcceptInvite message as non-broker");
726 return;
729 if (aRealName == mojo::core::ports::kInvalidNodeName ||
730 aInitialPort == mojo::core::ports::kInvalidPortName) {
731 NODECONTROLLER_WARNING("Invalid name in AcceptInvite message");
732 DropPeer(aFromNode);
733 return;
736 bool inserted = false;
737 Invite invite;
739 auto state = mState.Lock();
741 // Try to remove the source node from our invites list and insert it into
742 // our peers map under the new name.
743 if (state->mInvites.Remove(aFromNode, &invite)) {
744 MOZ_ASSERT(invite.mChannel && invite.mToMerge.is_valid());
745 state->mPeers.LookupOrInsertWith(aRealName, [&]() {
746 inserted = true;
747 return invite.mChannel;
751 if (!inserted) {
752 NODECONTROLLER_WARNING("Invalid AcceptInvite message from node %s",
753 ToString(aFromNode).c_str());
754 DropPeer(aFromNode);
755 return;
758 // Update the name of the node. This field is only accessed from the IO
759 // thread, so it's safe to update it without a lock held.
760 invite.mChannel->SetName(aRealName);
762 // Start the port merge to allow our existing initial port to begin
763 // communicating with the remote port.
764 PORTS_ALWAYS_OK(mNode->MergePorts(invite.mToMerge, aRealName, aInitialPort));
767 void NodeController::OnChannelError(const NodeName& aFromNode) {
768 AssertIOThread();
769 DropPeer(aFromNode);
772 static mojo::core::ports::NodeName RandomNodeName() {
773 return {RandomUint64OrDie(), RandomUint64OrDie()};
776 std::tuple<ScopedPort, RefPtr<NodeChannel>> NodeController::InviteChildProcess(
777 UniquePtr<IPC::Channel> aChannel,
778 GeckoChildProcessHost* aChildProcessHost) {
779 MOZ_ASSERT(IsBroker());
780 AssertIOThread();
782 // Create the peer with a randomly generated name, and store it in `mInvites`.
783 // This channel and name will be used for communication with the node until it
784 // sends us its' real name in an `AcceptInvite` message.
785 auto ports = CreatePortPair();
786 auto inviteName = RandomNodeName();
787 auto nodeChannel =
788 MakeRefPtr<NodeChannel>(inviteName, std::move(aChannel), this,
789 base::kInvalidProcessId, aChildProcessHost);
791 auto state = mState.Lock();
792 MOZ_DIAGNOSTIC_ASSERT(!state->mPeers.Contains(inviteName),
793 "UUID conflict?");
794 MOZ_DIAGNOSTIC_ASSERT(!state->mInvites.Contains(inviteName),
795 "UUID conflict?");
796 state->mInvites.InsertOrUpdate(inviteName,
797 Invite{nodeChannel, ports.second.Release()});
800 nodeChannel->Start();
801 return std::tuple{std::move(ports.first), std::move(nodeChannel)};
804 void NodeController::InitBrokerProcess() {
805 AssertIOThread();
806 MOZ_ASSERT(!gNodeController);
807 gNodeController = new NodeController(kBrokerNodeName);
810 ScopedPort NodeController::InitChildProcess(UniquePtr<IPC::Channel> aChannel,
811 base::ProcessId aParentPid) {
812 AssertIOThread();
813 MOZ_ASSERT(!gNodeController);
815 auto nodeName = RandomNodeName();
816 gNodeController = new NodeController(nodeName);
818 auto ports = gNodeController->CreatePortPair();
819 PortRef toMerge = ports.second.Release();
821 // Mark the port as expecting a pending merge. This is a duplicate of the
822 // information tracked by `mPendingMerges`, and was added by upstream
823 // chromium.
824 // See https://chromium-review.googlesource.com/c/chromium/src/+/3289065
826 mojo::core::ports::SinglePortLocker locker(&toMerge);
827 locker.port()->pending_merge_peer = true;
830 auto nodeChannel = MakeRefPtr<NodeChannel>(
831 kBrokerNodeName, std::move(aChannel), gNodeController, aParentPid);
833 auto state = gNodeController->mState.Lock();
834 MOZ_DIAGNOSTIC_ASSERT(!state->mPeers.Contains(kBrokerNodeName));
835 state->mPeers.InsertOrUpdate(kBrokerNodeName, nodeChannel);
836 MOZ_DIAGNOSTIC_ASSERT(!state->mPendingMerges.Contains(kBrokerNodeName));
837 state->mPendingMerges.LookupOrInsert(kBrokerNodeName)
838 .AppendElement(toMerge);
841 nodeChannel->Start();
842 nodeChannel->AcceptInvite(nodeName, toMerge.name());
843 return std::move(ports.first);
846 void NodeController::CleanUp() {
847 AssertIOThread();
848 MOZ_ASSERT(gNodeController);
850 RefPtr<NodeController> nodeController = gNodeController;
851 gNodeController = nullptr;
853 // Collect all objects from our state which need to be cleaned up.
854 nsTArray<NodeName> lostConnections;
855 nsTArray<RefPtr<NodeChannel>> channelsToClose;
856 nsTArray<PortRef> portsToClose;
858 auto state = nodeController->mState.Lock();
859 for (const auto& chan : state->mPeers) {
860 lostConnections.AppendElement(chan.GetKey());
861 channelsToClose.AppendElement(chan.GetData());
863 for (const auto& pending : state->mPendingMessages.Keys()) {
864 lostConnections.AppendElement(pending);
866 for (const auto& invite : state->mInvites.Values()) {
867 channelsToClose.AppendElement(invite.mChannel);
868 portsToClose.AppendElement(invite.mToMerge);
870 for (const auto& pendingPorts : state->mPendingMerges.Values()) {
871 portsToClose.AppendElements(pendingPorts);
873 state->mPeers.Clear();
874 state->mPendingMessages.Clear();
875 state->mInvites.Clear();
876 state->mPendingMerges.Clear();
878 for (auto& nodeChannel : channelsToClose) {
879 nodeChannel->Close();
881 for (auto& port : portsToClose) {
882 nodeController->mNode->ClosePort(port);
884 for (auto& name : lostConnections) {
885 nodeController->mNode->LostConnectionToNode(name);
889 #undef NODECONTROLLER_LOG
890 #undef NODECONTROLLER_WARNING
892 } // namespace mozilla::ipc