no bug - Bumping Firefox l10n changesets r=release a=l10n-bump DONTBUILD CLOSED TREE
[gecko.git] / ipc / glue / NodeController.cpp
blob532e4fa5092036805797bbcf7bb60060880e1f9b
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
5 * You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/ipc/NodeController.h"
8 #include "MainThreadUtils.h"
9 #include "base/process_util.h"
10 #include "chrome/common/ipc_message.h"
11 #include "mojo/core/ports/name.h"
12 #include "mojo/core/ports/node.h"
13 #include "mojo/core/ports/port_locker.h"
14 #include "mozilla/AlreadyAddRefed.h"
15 #include "mozilla/RandomNum.h"
16 #include "mozilla/StaticPtr.h"
17 #include "mozilla/Assertions.h"
18 #include "mozilla/ToString.h"
19 #include "mozilla/ipc/BrowserProcessSubThread.h"
20 #include "mozilla/ipc/ProtocolUtils.h"
21 #include "mozilla/mozalloc.h"
22 #include "nsISerialEventTarget.h"
23 #include "nsTArray.h"
24 #include "nsXULAppAPI.h"
25 #include "nsPrintfCString.h"
27 #define PORTS_ALWAYS_OK(expr) MOZ_ALWAYS_TRUE(mojo::core::ports::OK == (expr))
29 namespace mozilla::ipc {
31 static StaticRefPtr<NodeController> gNodeController;
33 static LazyLogModule gNodeControllerLog{"NodeController"};
35 // Helper logger macro which includes the name of the `this` NodeController in
36 // the logged messages.
37 #define NODECONTROLLER_LOG(level_, fmt_, ...) \
38 MOZ_LOG(gNodeControllerLog, level_, \
39 ("[%s]: " fmt_, ToString(mName).c_str(), ##__VA_ARGS__))
41 // Helper warning macro which both does logger logging and emits NS_WARNING logs
42 // under debug mode.
43 #ifdef DEBUG
44 # define NODECONTROLLER_WARNING(fmt_, ...) \
45 do { \
46 nsPrintfCString warning("[%s]: " fmt_, ToString(mName).c_str(), \
47 ##__VA_ARGS__); \
48 NS_WARNING(warning.get()); \
49 MOZ_LOG(gNodeControllerLog, LogLevel::Debug, ("%s", warning.get())); \
50 } while (0)
51 #else
52 # define NODECONTROLLER_WARNING(fmt_, ...) \
53 NODECONTROLLER_LOG(LogLevel::Warning, fmt_, ##__VA_ARGS__)
54 #endif
56 NodeController::NodeController(const NodeName& aName)
57 : mName(aName), mNode(MakeUnique<Node>(aName, this)) {}
59 NodeController::~NodeController() {
60 auto state = mState.Lock();
61 MOZ_RELEASE_ASSERT(state->mPeers.IsEmpty(),
62 "Destroying NodeController before closing all peers");
63 MOZ_RELEASE_ASSERT(state->mInvites.IsEmpty(),
64 "Destroying NodeController before closing all invites");
67 // FIXME: Actually provide some way to create the thing.
68 /* static */ NodeController* NodeController::GetSingleton() {
69 MOZ_ASSERT(gNodeController);
70 return gNodeController;
73 std::pair<ScopedPort, ScopedPort> NodeController::CreatePortPair() {
74 PortRef port0, port1;
75 PORTS_ALWAYS_OK(mNode->CreatePortPair(&port0, &port1));
76 return {ScopedPort{std::move(port0), this},
77 ScopedPort{std::move(port1), this}};
80 auto NodeController::GetPort(const PortName& aName) -> PortRef {
81 PortRef port;
82 int rv = mNode->GetPort(aName, &port);
83 if (NS_WARN_IF(rv != mojo::core::ports::OK)) {
84 NODECONTROLLER_WARNING("Call to GetPort(%s) Failed",
85 ToString(aName).c_str());
86 return {};
88 return port;
91 void NodeController::SetPortObserver(const PortRef& aPort,
92 PortObserver* aObserver) {
93 PORTS_ALWAYS_OK(mNode->SetUserData(aPort, aObserver));
96 auto NodeController::GetStatus(const PortRef& aPort) -> Maybe<PortStatus> {
97 PortStatus status{};
98 int rv = mNode->GetStatus(aPort, &status);
99 if (rv != mojo::core::ports::OK) {
100 return Nothing();
102 return Some(status);
105 void NodeController::ClosePort(const PortRef& aPort) {
106 PORTS_ALWAYS_OK(mNode->ClosePort(aPort));
109 bool NodeController::GetMessage(const PortRef& aPort,
110 UniquePtr<IPC::Message>* aMessage) {
111 UniquePtr<UserMessageEvent> messageEvent;
112 int rv = mNode->GetMessage(aPort, &messageEvent, nullptr);
113 if (rv != mojo::core::ports::OK) {
114 if (rv == mojo::core::ports::ERROR_PORT_PEER_CLOSED) {
115 return false;
117 MOZ_CRASH("GetMessage on port in invalid state");
120 if (messageEvent) {
121 UniquePtr<IPC::Message> message = messageEvent->TakeMessage<IPC::Message>();
123 // If our UserMessageEvent has any ports directly attached to it, fetch them
124 // from our node and attach them to the IPC::Message we extracted.
126 // It's important to only do this if we have nonempty set of ports on the
127 // event, as we may have never serialized our IPC::Message's ports onto the
128 // event if it was routed in-process.
129 if (messageEvent->num_ports() > 0) {
130 nsTArray<ScopedPort> attachedPorts(messageEvent->num_ports());
131 for (size_t i = 0; i < messageEvent->num_ports(); ++i) {
132 attachedPorts.AppendElement(
133 ScopedPort{GetPort(messageEvent->ports()[i]), this});
135 message->SetAttachedPorts(std::move(attachedPorts));
138 *aMessage = std::move(message);
139 } else {
140 *aMessage = nullptr;
142 return true;
145 bool NodeController::SendUserMessage(const PortRef& aPort,
146 UniquePtr<IPC::Message> aMessage) {
147 auto messageEvent = MakeUnique<UserMessageEvent>(0);
148 messageEvent->AttachMessage(std::move(aMessage));
150 int rv = mNode->SendUserMessage(aPort, std::move(messageEvent));
151 if (rv == mojo::core::ports::OK) {
152 return true;
154 if (rv == mojo::core::ports::ERROR_PORT_PEER_CLOSED) {
155 NODECONTROLLER_LOG(LogLevel::Debug,
156 "Ignoring message to port %s as peer was closed",
157 ToString(aPort.name()).c_str());
158 return true;
160 NODECONTROLLER_WARNING("Failed to send message to port %s",
161 ToString(aPort.name()).c_str());
162 return false;
165 auto NodeController::SerializeEventMessage(UniquePtr<Event> aEvent,
166 const NodeName* aRelayTarget,
167 uint32_t aType)
168 -> UniquePtr<IPC::Message> {
169 UniquePtr<IPC::Message> message;
170 if (aEvent->type() == Event::kUserMessage) {
171 MOZ_DIAGNOSTIC_ASSERT(
172 aType == EVENT_MESSAGE_TYPE,
173 "Can only send a UserMessage in an EVENT_MESSAGE_TYPE");
174 message = static_cast<UserMessageEvent*>(aEvent.get())
175 ->TakeMessage<IPC::Message>();
176 } else {
177 message = MakeUnique<IPC::Message>(MSG_ROUTING_CONTROL, aType);
180 message->set_relay(aRelayTarget != nullptr);
182 size_t length = aEvent->GetSerializedSize();
183 if (aRelayTarget) {
184 length += sizeof(NodeName);
187 // Use an intermediate buffer to serialize to avoid potential issues with the
188 // segmented `IPC::Message` bufferlist. This should be fairly cheap, as the
189 // majority of events are fairly small.
190 Vector<char, 256, InfallibleAllocPolicy> buffer;
191 (void)buffer.initLengthUninitialized(length);
192 if (aRelayTarget) {
193 memcpy(buffer.begin(), aRelayTarget, sizeof(NodeName));
194 aEvent->Serialize(buffer.begin() + sizeof(NodeName));
195 } else {
196 aEvent->Serialize(buffer.begin());
199 message->WriteFooter(buffer.begin(), buffer.length());
200 message->set_event_footer_size(buffer.length());
202 #ifdef DEBUG
203 // Debug-assert that we can read the same data back out of the buffer.
204 MOZ_ASSERT(message->event_footer_size() == length);
205 Vector<char, 256, InfallibleAllocPolicy> buffer2;
206 (void)buffer2.initLengthUninitialized(message->event_footer_size());
207 MOZ_ASSERT(message->ReadFooter(buffer2.begin(), buffer2.length(),
208 /* truncate */ false));
209 MOZ_ASSERT(!memcmp(buffer2.begin(), buffer.begin(), buffer.length()));
210 #endif
212 return message;
215 auto NodeController::DeserializeEventMessage(UniquePtr<IPC::Message> aMessage,
216 NodeName* aRelayTarget)
217 -> UniquePtr<Event> {
218 if (aMessage->is_relay() && !aRelayTarget) {
219 NODECONTROLLER_WARNING("Unexpected relay message '%s'", aMessage->name());
220 return nullptr;
223 Vector<char, 256, InfallibleAllocPolicy> buffer;
224 (void)buffer.initLengthUninitialized(aMessage->event_footer_size());
225 // Truncate the message when reading the footer, so that the extra footer data
226 // is no longer present in the message. This allows future code to eventually
227 // send the same `IPC::Message` to another process.
228 if (!aMessage->ReadFooter(buffer.begin(), buffer.length(),
229 /* truncate */ true)) {
230 NODECONTROLLER_WARNING("Call to ReadFooter for message '%s' Failed",
231 aMessage->name());
232 return nullptr;
234 aMessage->set_event_footer_size(0);
236 UniquePtr<Event> event;
237 if (aRelayTarget) {
238 MOZ_ASSERT(aMessage->is_relay());
239 if (buffer.length() < sizeof(NodeName)) {
240 NODECONTROLLER_WARNING(
241 "Insufficient space in message footer for message '%s'",
242 aMessage->name());
243 return nullptr;
245 memcpy(aRelayTarget, buffer.begin(), sizeof(NodeName));
246 event = Event::Deserialize(buffer.begin() + sizeof(NodeName),
247 buffer.length() - sizeof(NodeName));
248 } else {
249 event = Event::Deserialize(buffer.begin(), buffer.length());
252 if (!event) {
253 NODECONTROLLER_WARNING("Call to Event::Deserialize for message '%s' Failed",
254 aMessage->name());
255 return nullptr;
258 if (event->type() == Event::kUserMessage) {
259 static_cast<UserMessageEvent*>(event.get())
260 ->AttachMessage(std::move(aMessage));
262 return event;
265 already_AddRefed<NodeChannel> NodeController::GetNodeChannel(
266 const NodeName& aName) {
267 auto state = mState.Lock();
268 return do_AddRef(state->mPeers.Get(aName));
271 void NodeController::DropPeer(NodeName aNodeName) {
272 AssertIOThread();
274 #ifdef FUZZING_SNAPSHOT
275 MOZ_FUZZING_IPC_DROP_PEER("NodeController::DropPeer");
276 #endif
278 Invite invite;
279 RefPtr<NodeChannel> channel;
280 nsTArray<PortRef> pendingMerges;
282 auto state = mState.Lock();
283 state->mPeers.Remove(aNodeName, &channel);
284 state->mPendingMessages.Remove(aNodeName);
285 state->mInvites.Remove(aNodeName, &invite);
286 state->mPendingMerges.Remove(aNodeName, &pendingMerges);
289 NODECONTROLLER_LOG(LogLevel::Info, "Dropping Peer %s (pid: %" PRIPID ")",
290 ToString(aNodeName).c_str(),
291 channel ? channel->OtherPid() : base::kInvalidProcessId);
293 if (channel) {
294 channel->Close();
296 if (invite.mChannel) {
297 invite.mChannel->Close();
299 if (invite.mToMerge.is_valid()) {
300 // Ignore any possible errors here.
301 (void)mNode->ClosePort(invite.mToMerge);
303 for (auto& port : pendingMerges) {
304 // Ignore any possible errors here.
305 (void)mNode->ClosePort(port);
307 mNode->LostConnectionToNode(aNodeName);
310 void NodeController::ForwardEvent(const NodeName& aNode,
311 UniquePtr<Event> aEvent) {
312 if (aNode == mName) {
313 (void)mNode->AcceptEvent(mName, std::move(aEvent));
314 } else {
315 // On Windows and macOS, messages holding HANDLEs or mach ports must be
316 // relayed via the broker process so it can transfer ownership.
317 bool needsRelay = false;
318 #if defined(XP_WIN) || defined(XP_MACOSX)
319 if (!IsBroker() && aNode != kBrokerNodeName &&
320 aEvent->type() == Event::kUserMessage) {
321 auto* userEvent = static_cast<UserMessageEvent*>(aEvent.get());
322 needsRelay =
323 userEvent->HasMessage() &&
324 userEvent->GetMessage<IPC::Message>()->num_relayed_attachments() > 0;
326 #endif
328 UniquePtr<IPC::Message> message =
329 SerializeEventMessage(std::move(aEvent), needsRelay ? &aNode : nullptr);
330 MOZ_ASSERT(message->is_relay() == needsRelay,
331 "Message relay status set incorrectly");
333 RefPtr<NodeChannel> peer;
334 RefPtr<NodeChannel> broker;
335 bool needsIntroduction = false;
337 auto state = mState.Lock();
339 // Check if we know this peer. If we don't, we'll need to request an
340 // introduction.
341 peer = state->mPeers.Get(aNode);
342 if (!peer || needsRelay) {
343 if (IsBroker()) {
344 NODECONTROLLER_WARNING("Ignoring message '%s' to unknown peer %s",
345 message->name(), ToString(aNode).c_str());
346 return;
349 broker = state->mPeers.Get(kBrokerNodeName);
350 if (!broker) {
351 NODECONTROLLER_WARNING(
352 "Ignoring message '%s' to peer %s due to a missing broker",
353 message->name(), ToString(aNode).c_str());
354 return;
357 if (!needsRelay) {
358 auto& queue =
359 state->mPendingMessages.LookupOrInsertWith(aNode, [&]() {
360 needsIntroduction = true;
361 return Queue<UniquePtr<IPC::Message>, 64>{};
363 queue.Push(std::move(message));
368 MOZ_ASSERT(!needsIntroduction || !needsRelay,
369 "Only one of the two should ever be set");
371 if (needsRelay) {
372 NODECONTROLLER_LOG(LogLevel::Info,
373 "Relaying message '%s' for peer %s due to %" PRIu32
374 " attachments",
375 message->name(), ToString(aNode).c_str(),
376 message->num_relayed_attachments());
377 MOZ_ASSERT(message->num_relayed_attachments() > 0 && broker);
378 broker->SendEventMessage(std::move(message));
379 } else if (needsIntroduction) {
380 MOZ_ASSERT(broker);
381 broker->RequestIntroduction(aNode);
382 } else if (peer) {
383 peer->SendEventMessage(std::move(message));
388 void NodeController::BroadcastEvent(UniquePtr<Event> aEvent) {
389 UniquePtr<IPC::Message> message =
390 SerializeEventMessage(std::move(aEvent), nullptr, BROADCAST_MESSAGE_TYPE);
392 if (IsBroker()) {
393 OnBroadcast(mName, std::move(message));
394 } else if (RefPtr<NodeChannel> broker = GetNodeChannel(kBrokerNodeName)) {
395 broker->Broadcast(std::move(message));
396 } else {
397 NODECONTROLLER_WARNING(
398 "Trying to broadcast event, but no connection to broker");
402 void NodeController::PortStatusChanged(const PortRef& aPortRef) {
403 RefPtr<UserData> userData;
404 int rv = mNode->GetUserData(aPortRef, &userData);
405 if (rv != mojo::core::ports::OK) {
406 NODECONTROLLER_WARNING("GetUserData call for port '%s' failed",
407 ToString(aPortRef.name()).c_str());
408 return;
410 if (userData) {
411 // All instances of `UserData` attached to ports in this node must be of
412 // type `PortObserver`, so we can call `OnPortStatusChanged` directly on
413 // them.
414 static_cast<PortObserver*>(userData.get())->OnPortStatusChanged();
418 void NodeController::OnEventMessage(const NodeName& aFromNode,
419 UniquePtr<IPC::Message> aMessage) {
420 AssertIOThread();
422 bool isRelay = aMessage->is_relay();
423 if (isRelay && aMessage->num_relayed_attachments() == 0) {
424 NODECONTROLLER_WARNING(
425 "Invalid relay message without relayed attachments from peer %s!",
426 ToString(aFromNode).c_str());
427 DropPeer(aFromNode);
428 return;
431 NodeName relayTarget;
432 UniquePtr<Event> event = DeserializeEventMessage(
433 std::move(aMessage), isRelay ? &relayTarget : nullptr);
434 if (!event) {
435 NODECONTROLLER_WARNING("Invalid EventMessage from peer %s!",
436 ToString(aFromNode).c_str());
437 DropPeer(aFromNode);
438 return;
441 NodeName fromNode = aFromNode;
442 #if defined(XP_WIN) || defined(XP_MACOSX)
443 if (isRelay) {
444 if (event->type() != Event::kUserMessage) {
445 NODECONTROLLER_WARNING(
446 "Unexpected relay of non-UserMessage event from peer %s!",
447 ToString(aFromNode).c_str());
448 DropPeer(aFromNode);
449 return;
452 // If we're the broker, then we'll need to forward this message on to the
453 // true recipient. To do this, we re-serialize the message, passing along
454 // the original source node, and send it to the final node.
455 if (IsBroker()) {
456 UniquePtr<IPC::Message> message =
457 SerializeEventMessage(std::move(event), &aFromNode);
458 if (!message) {
459 NODECONTROLLER_WARNING(
460 "Relaying EventMessage from peer %s failed to re-serialize!",
461 ToString(aFromNode).c_str());
462 DropPeer(aFromNode);
463 return;
465 MOZ_ASSERT(message->is_relay(), "Message stopped being a relay message?");
466 MOZ_ASSERT(message->num_relayed_attachments() > 0,
467 "Message doesn't have relayed attachments?");
469 NODECONTROLLER_LOG(
470 LogLevel::Info,
471 "Relaying message '%s' from peer %s to peer %s (%" PRIu32
472 " attachments)",
473 message->name(), ToString(aFromNode).c_str(),
474 ToString(relayTarget).c_str(), message->num_relayed_attachments());
476 RefPtr<NodeChannel> peer;
478 auto state = mState.Lock();
479 peer = state->mPeers.Get(relayTarget);
481 if (!peer) {
482 NODECONTROLLER_WARNING(
483 "Dropping relayed message from %s to unknown peer %s",
484 ToString(aFromNode).c_str(), ToString(relayTarget).c_str());
485 return;
488 peer->SendEventMessage(std::move(message));
489 return;
492 // Otherwise, we're the final recipient, so we can continue & process the
493 // message as usual.
494 if (aFromNode != kBrokerNodeName) {
495 NODECONTROLLER_WARNING(
496 "Unexpected relayed EventMessage from non-broker peer %s!",
497 ToString(aFromNode).c_str());
498 DropPeer(aFromNode);
499 return;
501 fromNode = relayTarget;
503 NODECONTROLLER_LOG(LogLevel::Info, "Got relayed message from peer %s",
504 ToString(fromNode).c_str());
506 #endif
508 // If we're getting a requested port merge from another process, check to make
509 // sure that we're expecting the request, and record that the merge has
510 // arrived so we don't try to close the port on error.
511 if (event->type() == Event::kMergePort) {
512 // Check that the target port for the merge actually exists.
513 auto targetPort = GetPort(event->port_name());
514 if (!targetPort.is_valid()) {
515 NODECONTROLLER_WARNING(
516 "Unexpected MergePortEvent from peer %s for unknown port %s",
517 ToString(fromNode).c_str(), ToString(event->port_name()).c_str());
518 DropPeer(fromNode);
519 return;
522 // Check if `targetPort` is in our pending merges entry for the given source
523 // node. If this makes the `mPendingMerges` entry empty, remove it.
524 bool expectingMerge = [&] {
525 auto state = mState.Lock();
526 auto pendingMerges = state->mPendingMerges.Lookup(aFromNode);
527 if (!pendingMerges) {
528 return false;
530 size_t removed = pendingMerges->RemoveElementsBy(
531 [&](auto& port) { return port.name() == targetPort.name(); });
532 if (removed != 0 && pendingMerges->IsEmpty()) {
533 pendingMerges.Remove();
535 return removed != 0;
536 }();
538 if (!expectingMerge) {
539 NODECONTROLLER_WARNING(
540 "Unexpected MergePortEvent from peer %s for port %s",
541 ToString(fromNode).c_str(), ToString(event->port_name()).c_str());
542 DropPeer(fromNode);
543 return;
547 (void)mNode->AcceptEvent(fromNode, std::move(event));
550 void NodeController::OnBroadcast(const NodeName& aFromNode,
551 UniquePtr<IPC::Message> aMessage) {
552 MOZ_DIAGNOSTIC_ASSERT(aMessage->type() == BROADCAST_MESSAGE_TYPE);
554 // NOTE: This method may be called off of the IO thread by the
555 // `BroadcastEvent` node callback.
556 if (!IsBroker()) {
557 NODECONTROLLER_WARNING("Broadcast request received by non-broker node");
558 return;
561 UniquePtr<Event> event = DeserializeEventMessage(std::move(aMessage));
562 if (!event) {
563 NODECONTROLLER_WARNING("Invalid broadcast message from peer");
564 return;
567 nsTArray<RefPtr<NodeChannel>> peers;
569 auto state = mState.Lock();
570 peers.SetCapacity(state->mPeers.Count());
571 for (const auto& peer : state->mPeers.Values()) {
572 peers.AppendElement(peer);
575 for (const auto& peer : peers) {
576 // NOTE: This `clone` operation is only supported for a limited number of
577 // message types by the ports API, which provides some extra security by
578 // only allowing those specific types of messages to be broadcasted.
579 // Messages which don't support `CloneForBroadcast` cannot be broadcast, and
580 // the ports library will not attempt to broadcast them.
581 auto clone = event->CloneForBroadcast();
582 if (!clone) {
583 NODECONTROLLER_WARNING("Attempt to broadcast unsupported message");
584 break;
587 peer->SendEventMessage(SerializeEventMessage(std::move(clone)));
591 void NodeController::OnIntroduce(const NodeName& aFromNode,
592 NodeChannel::Introduction aIntroduction) {
593 AssertIOThread();
595 if (aFromNode != kBrokerNodeName) {
596 NODECONTROLLER_WARNING("Introduction received from non-broker node");
597 DropPeer(aFromNode);
598 return;
601 MOZ_ASSERT(aIntroduction.mMyPid == base::GetCurrentProcId(),
602 "We're the wrong process to receive this?");
604 if (!aIntroduction.mHandle) {
605 NODECONTROLLER_WARNING("Could not be introduced to peer %s",
606 ToString(aIntroduction.mName).c_str());
607 mNode->LostConnectionToNode(aIntroduction.mName);
609 auto state = mState.Lock();
610 state->mPendingMessages.Remove(aIntroduction.mName);
611 return;
614 auto channel =
615 MakeUnique<IPC::Channel>(std::move(aIntroduction.mHandle),
616 aIntroduction.mMode, aIntroduction.mOtherPid);
617 auto nodeChannel = MakeRefPtr<NodeChannel>(
618 aIntroduction.mName, std::move(channel), this, aIntroduction.mOtherPid);
621 auto state = mState.Lock();
622 bool isNew = false;
623 state->mPeers.LookupOrInsertWith(aIntroduction.mName, [&]() {
624 isNew = true;
625 return nodeChannel;
627 if (!isNew) {
628 // We got a duplicate introduction. This can happen during normal
629 // execution if both sides request an introduction at the same time. We
630 // can just ignore the second one, as they'll arrive in the same order in
631 // both processes.
632 nodeChannel->Close();
633 return;
636 // Deliver any pending messages, then remove the entry from our table. We do
637 // this while `mState` is still held to ensure that these messages are
638 // all sent before another thread can observe the newly created channel.
639 // As the channel hasn't been `Connect()`-ed yet, this will only queue the
640 // messages up to be sent, so is OK to do with the mutex held. These
641 // messages will be processed to be sent during `Start()` below, which is
642 // performed outside of the lock.
643 if (auto pending = state->mPendingMessages.Lookup(aIntroduction.mName)) {
644 while (!pending->IsEmpty()) {
645 nodeChannel->SendEventMessage(pending->Pop());
647 pending.Remove();
651 // NodeChannel::Start must be called with the lock not held, as it may lead to
652 // callbacks being made into `OnChannelError` or `OnMessageReceived`, which
653 // will attempt to re-acquire our lock.
654 nodeChannel->Start();
657 void NodeController::OnRequestIntroduction(const NodeName& aFromNode,
658 const NodeName& aName) {
659 AssertIOThread();
660 if (NS_WARN_IF(!IsBroker())) {
661 return;
664 RefPtr<NodeChannel> peerA = GetNodeChannel(aFromNode);
665 if (!peerA || aName == mojo::core::ports::kInvalidNodeName) {
666 NODECONTROLLER_WARNING("Invalid OnRequestIntroduction message from node %s",
667 ToString(aFromNode).c_str());
668 DropPeer(aFromNode);
669 return;
672 RefPtr<NodeChannel> peerB = GetNodeChannel(aName);
673 IPC::Channel::ChannelHandle handleA, handleB;
674 if (!peerB || !IPC::Channel::CreateRawPipe(&handleA, &handleB)) {
675 NODECONTROLLER_WARNING(
676 "Rejecting introduction request from '%s' for unknown peer '%s'",
677 ToString(aFromNode).c_str(), ToString(aName).c_str());
679 // We don't know this peer, or ran into issues creating the descriptor! Send
680 // an invalid introduction to content to clean up any pending outbound
681 // messages.
682 NodeChannel::Introduction intro{aName, nullptr, IPC::Channel::MODE_SERVER,
683 peerA->OtherPid(), base::kInvalidProcessId};
684 peerA->Introduce(std::move(intro));
685 return;
688 NodeChannel::Introduction introA{aName, std::move(handleA),
689 IPC::Channel::MODE_SERVER, peerA->OtherPid(),
690 peerB->OtherPid()};
691 NodeChannel::Introduction introB{aFromNode, std::move(handleB),
692 IPC::Channel::MODE_CLIENT, peerB->OtherPid(),
693 peerA->OtherPid()};
694 peerA->Introduce(std::move(introA));
695 peerB->Introduce(std::move(introB));
698 void NodeController::OnAcceptInvite(const NodeName& aFromNode,
699 const NodeName& aRealName,
700 const PortName& aInitialPort) {
701 AssertIOThread();
702 if (!IsBroker()) {
703 NODECONTROLLER_WARNING("Ignoring AcceptInvite message as non-broker");
704 return;
707 if (aRealName == mojo::core::ports::kInvalidNodeName ||
708 aInitialPort == mojo::core::ports::kInvalidPortName) {
709 NODECONTROLLER_WARNING("Invalid name in AcceptInvite message");
710 DropPeer(aFromNode);
711 return;
714 bool inserted = false;
715 Invite invite;
717 auto state = mState.Lock();
718 MOZ_ASSERT(state->mPendingMessages.IsEmpty(),
719 "Shouldn't have pending messages in broker");
721 // Try to remove the source node from our invites list and insert it into
722 // our peers map under the new name.
723 if (state->mInvites.Remove(aFromNode, &invite)) {
724 MOZ_ASSERT(invite.mChannel && invite.mToMerge.is_valid());
725 state->mPeers.LookupOrInsertWith(aRealName, [&]() {
726 inserted = true;
727 return invite.mChannel;
731 if (!inserted) {
732 NODECONTROLLER_WARNING("Invalid AcceptInvite message from node %s",
733 ToString(aFromNode).c_str());
734 DropPeer(aFromNode);
735 return;
738 // Update the name of the node. This field is only accessed from the IO
739 // thread, so it's safe to update it without a lock held.
740 invite.mChannel->SetName(aRealName);
742 // Start the port merge to allow our existing initial port to begin
743 // communicating with the remote port.
744 PORTS_ALWAYS_OK(mNode->MergePorts(invite.mToMerge, aRealName, aInitialPort));
747 void NodeController::OnChannelError(const NodeName& aFromNode) {
748 AssertIOThread();
749 DropPeer(aFromNode);
752 static mojo::core::ports::NodeName RandomNodeName() {
753 return {RandomUint64OrDie(), RandomUint64OrDie()};
756 std::tuple<ScopedPort, RefPtr<NodeChannel>> NodeController::InviteChildProcess(
757 UniquePtr<IPC::Channel> aChannel,
758 GeckoChildProcessHost* aChildProcessHost) {
759 MOZ_ASSERT(IsBroker());
760 AssertIOThread();
762 // Create the peer with a randomly generated name, and store it in `mInvites`.
763 // This channel and name will be used for communication with the node until it
764 // sends us its' real name in an `AcceptInvite` message.
765 auto ports = CreatePortPair();
766 auto inviteName = RandomNodeName();
767 auto nodeChannel =
768 MakeRefPtr<NodeChannel>(inviteName, std::move(aChannel), this,
769 base::kInvalidProcessId, aChildProcessHost);
771 auto state = mState.Lock();
772 MOZ_DIAGNOSTIC_ASSERT(!state->mPeers.Contains(inviteName),
773 "UUID conflict?");
774 MOZ_DIAGNOSTIC_ASSERT(!state->mInvites.Contains(inviteName),
775 "UUID conflict?");
776 state->mInvites.InsertOrUpdate(inviteName,
777 Invite{nodeChannel, ports.second.Release()});
780 nodeChannel->Start();
781 return std::tuple{std::move(ports.first), std::move(nodeChannel)};
784 void NodeController::InitBrokerProcess() {
785 AssertIOThread();
786 MOZ_ASSERT(!gNodeController);
787 gNodeController = new NodeController(kBrokerNodeName);
790 ScopedPort NodeController::InitChildProcess(UniquePtr<IPC::Channel> aChannel,
791 base::ProcessId aParentPid) {
792 AssertIOThread();
793 MOZ_ASSERT(!gNodeController);
795 auto nodeName = RandomNodeName();
796 gNodeController = new NodeController(nodeName);
798 auto ports = gNodeController->CreatePortPair();
799 PortRef toMerge = ports.second.Release();
801 // Mark the port as expecting a pending merge. This is a duplicate of the
802 // information tracked by `mPendingMerges`, and was added by upstream
803 // chromium.
804 // See https://chromium-review.googlesource.com/c/chromium/src/+/3289065
806 mojo::core::ports::SinglePortLocker locker(&toMerge);
807 locker.port()->pending_merge_peer = true;
810 auto nodeChannel = MakeRefPtr<NodeChannel>(
811 kBrokerNodeName, std::move(aChannel), gNodeController, aParentPid);
813 auto state = gNodeController->mState.Lock();
814 MOZ_DIAGNOSTIC_ASSERT(!state->mPeers.Contains(kBrokerNodeName));
815 state->mPeers.InsertOrUpdate(kBrokerNodeName, nodeChannel);
816 MOZ_DIAGNOSTIC_ASSERT(!state->mPendingMerges.Contains(kBrokerNodeName));
817 state->mPendingMerges.LookupOrInsert(kBrokerNodeName)
818 .AppendElement(toMerge);
821 nodeChannel->Start();
822 nodeChannel->AcceptInvite(nodeName, toMerge.name());
823 return std::move(ports.first);
826 void NodeController::CleanUp() {
827 AssertIOThread();
828 MOZ_ASSERT(gNodeController);
830 RefPtr<NodeController> nodeController = gNodeController;
831 gNodeController = nullptr;
833 // Collect all objects from our state which need to be cleaned up.
834 nsTArray<NodeName> lostConnections;
835 nsTArray<RefPtr<NodeChannel>> channelsToClose;
836 nsTArray<PortRef> portsToClose;
838 auto state = nodeController->mState.Lock();
839 for (const auto& chan : state->mPeers) {
840 lostConnections.AppendElement(chan.GetKey());
841 channelsToClose.AppendElement(chan.GetData());
843 for (const auto& invite : state->mInvites.Values()) {
844 channelsToClose.AppendElement(invite.mChannel);
845 portsToClose.AppendElement(invite.mToMerge);
847 for (const auto& pendingPorts : state->mPendingMerges.Values()) {
848 portsToClose.AppendElements(pendingPorts);
850 state->mPeers.Clear();
851 state->mPendingMessages.Clear();
852 state->mInvites.Clear();
853 state->mPendingMerges.Clear();
855 for (auto& nodeChannel : channelsToClose) {
856 nodeChannel->Close();
858 for (auto& port : portsToClose) {
859 nodeController->mNode->ClosePort(port);
861 for (auto& name : lostConnections) {
862 nodeController->mNode->LostConnectionToNode(name);
866 #undef NODECONTROLLER_LOG
867 #undef NODECONTROLLER_WARNING
869 } // namespace mozilla::ipc