Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / ipc / mojo / ipc_channel_mojo.cc
blobff20719dd7d7ff41bb8939aa297af23bca759968
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/mojo/ipc_channel_mojo.h"
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/lazy_instance.h"
10 #include "base/thread_task_runner_handle.h"
11 #include "ipc/ipc_listener.h"
12 #include "ipc/ipc_logging.h"
13 #include "ipc/ipc_message_attachment_set.h"
14 #include "ipc/ipc_message_macros.h"
15 #include "ipc/mojo/client_channel.mojom.h"
16 #include "ipc/mojo/ipc_mojo_bootstrap.h"
17 #include "ipc/mojo/ipc_mojo_handle_attachment.h"
18 #include "third_party/mojo/src/mojo/edk/embedder/embedder.h"
19 #include "third_party/mojo/src/mojo/public/cpp/bindings/binding.h"
21 #if defined(OS_POSIX) && !defined(OS_NACL)
22 #include "ipc/ipc_platform_file_attachment_posix.h"
23 #endif
25 namespace IPC {
27 namespace {
29 class MojoChannelFactory : public ChannelFactory {
30 public:
31 MojoChannelFactory(scoped_refptr<base::TaskRunner> io_runner,
32 ChannelHandle channel_handle,
33 Channel::Mode mode,
34 AttachmentBroker* broker)
35 : io_runner_(io_runner),
36 channel_handle_(channel_handle),
37 mode_(mode),
38 broker_(broker) {}
40 std::string GetName() const override {
41 return channel_handle_.name;
44 scoped_ptr<Channel> BuildChannel(Listener* listener) override {
45 return ChannelMojo::Create(io_runner_, channel_handle_, mode_, listener,
46 broker_);
49 private:
50 scoped_refptr<base::TaskRunner> io_runner_;
51 ChannelHandle channel_handle_;
52 Channel::Mode mode_;
53 AttachmentBroker* broker_;
56 //------------------------------------------------------------------------------
58 class ClientChannelMojo : public ChannelMojo, public ClientChannel {
59 public:
60 ClientChannelMojo(scoped_refptr<base::TaskRunner> io_runner,
61 const ChannelHandle& handle,
62 Listener* listener,
63 AttachmentBroker* broker);
64 ~ClientChannelMojo() override;
65 // MojoBootstrap::Delegate implementation
66 void OnPipeAvailable(mojo::embedder::ScopedPlatformHandle handle) override;
68 // ClientChannel implementation
69 void Init(
70 mojo::ScopedMessagePipeHandle pipe,
71 int32_t peer_pid,
72 const mojo::Callback<void(int32_t)>& callback) override;
74 private:
75 void BindPipe(mojo::ScopedMessagePipeHandle handle);
76 void OnConnectionError();
78 mojo::Binding<ClientChannel> binding_;
79 base::WeakPtrFactory<ClientChannelMojo> weak_factory_;
81 DISALLOW_COPY_AND_ASSIGN(ClientChannelMojo);
84 ClientChannelMojo::ClientChannelMojo(scoped_refptr<base::TaskRunner> io_runner,
85 const ChannelHandle& handle,
86 Listener* listener,
87 AttachmentBroker* broker)
88 : ChannelMojo(io_runner, handle, Channel::MODE_CLIENT, listener, broker),
89 binding_(this),
90 weak_factory_(this) {
93 ClientChannelMojo::~ClientChannelMojo() {
96 void ClientChannelMojo::OnPipeAvailable(
97 mojo::embedder::ScopedPlatformHandle handle) {
98 CreateMessagingPipe(handle.Pass(), base::Bind(&ClientChannelMojo::BindPipe,
99 weak_factory_.GetWeakPtr()));
102 void ClientChannelMojo::Init(
103 mojo::ScopedMessagePipeHandle pipe,
104 int32_t peer_pid,
105 const mojo::Callback<void(int32_t)>& callback) {
106 InitMessageReader(pipe.Pass(), static_cast<base::ProcessId>(peer_pid));
107 callback.Run(GetSelfPID());
110 void ClientChannelMojo::BindPipe(mojo::ScopedMessagePipeHandle handle) {
111 binding_.Bind(handle.Pass());
114 void ClientChannelMojo::OnConnectionError() {
115 listener()->OnChannelError();
118 //------------------------------------------------------------------------------
120 class ServerChannelMojo : public ChannelMojo {
121 public:
122 ServerChannelMojo(scoped_refptr<base::TaskRunner> io_runner,
123 const ChannelHandle& handle,
124 Listener* listener,
125 AttachmentBroker* broker);
126 ~ServerChannelMojo() override;
128 // MojoBootstrap::Delegate implementation
129 void OnPipeAvailable(mojo::embedder::ScopedPlatformHandle handle) override;
130 // Channel override
131 void Close() override;
133 private:
134 void InitClientChannel(mojo::ScopedMessagePipeHandle peer_handle,
135 mojo::ScopedMessagePipeHandle handle);
136 void OnConnectionError();
138 // ClientChannelClient implementation
139 void ClientChannelWasInitialized(int32_t peer_pid);
141 mojo::InterfacePtr<ClientChannel> client_channel_;
142 mojo::ScopedMessagePipeHandle message_pipe_;
143 base::WeakPtrFactory<ServerChannelMojo> weak_factory_;
145 DISALLOW_COPY_AND_ASSIGN(ServerChannelMojo);
148 ServerChannelMojo::ServerChannelMojo(scoped_refptr<base::TaskRunner> io_runner,
149 const ChannelHandle& handle,
150 Listener* listener,
151 AttachmentBroker* broker)
152 : ChannelMojo(io_runner, handle, Channel::MODE_SERVER, listener, broker),
153 weak_factory_(this) {
156 ServerChannelMojo::~ServerChannelMojo() {
157 Close();
160 void ServerChannelMojo::OnPipeAvailable(
161 mojo::embedder::ScopedPlatformHandle handle) {
162 mojo::ScopedMessagePipeHandle peer;
163 MojoResult create_result =
164 mojo::CreateMessagePipe(nullptr, &message_pipe_, &peer);
165 if (create_result != MOJO_RESULT_OK) {
166 LOG(WARNING) << "mojo::CreateMessagePipe failed: " << create_result;
167 listener()->OnChannelError();
168 return;
170 CreateMessagingPipe(
171 handle.Pass(),
172 base::Bind(&ServerChannelMojo::InitClientChannel,
173 weak_factory_.GetWeakPtr(), base::Passed(&peer)));
176 void ServerChannelMojo::Close() {
177 client_channel_.reset();
178 message_pipe_.reset();
179 ChannelMojo::Close();
182 void ServerChannelMojo::InitClientChannel(
183 mojo::ScopedMessagePipeHandle peer_handle,
184 mojo::ScopedMessagePipeHandle handle) {
185 client_channel_.Bind(
186 mojo::InterfacePtrInfo<ClientChannel>(handle.Pass(), 0u));
187 client_channel_.set_connection_error_handler(base::Bind(
188 &ServerChannelMojo::OnConnectionError, base::Unretained(this)));
189 client_channel_->Init(
190 peer_handle.Pass(), static_cast<int32_t>(GetSelfPID()),
191 base::Bind(&ServerChannelMojo::ClientChannelWasInitialized,
192 base::Unretained(this)));
195 void ServerChannelMojo::OnConnectionError() {
196 listener()->OnChannelError();
199 void ServerChannelMojo::ClientChannelWasInitialized(int32_t peer_pid) {
200 InitMessageReader(message_pipe_.Pass(), peer_pid);
203 #if defined(OS_POSIX) && !defined(OS_NACL)
205 base::ScopedFD TakeOrDupFile(internal::PlatformFileAttachment* attachment) {
206 return attachment->Owns() ? base::ScopedFD(attachment->TakePlatformFile())
207 : base::ScopedFD(dup(attachment->file()));
210 #endif
212 } // namespace
214 //------------------------------------------------------------------------------
216 ChannelMojo::ChannelInfoDeleter::ChannelInfoDeleter(
217 scoped_refptr<base::TaskRunner> io_runner)
218 : io_runner(io_runner) {
221 ChannelMojo::ChannelInfoDeleter::~ChannelInfoDeleter() {
224 void ChannelMojo::ChannelInfoDeleter::operator()(
225 mojo::embedder::ChannelInfo* ptr) const {
226 if (base::ThreadTaskRunnerHandle::Get() == io_runner) {
227 mojo::embedder::DestroyChannelOnIOThread(ptr);
228 } else {
229 io_runner->PostTask(
230 FROM_HERE, base::Bind(&mojo::embedder::DestroyChannelOnIOThread, ptr));
234 //------------------------------------------------------------------------------
236 // static
237 bool ChannelMojo::ShouldBeUsed() {
238 // TODO(rockot): Investigate performance bottlenecks and hopefully reenable
239 // this at some point. http://crbug.com/500019
240 return false;
243 // static
244 scoped_ptr<ChannelMojo> ChannelMojo::Create(
245 scoped_refptr<base::TaskRunner> io_runner,
246 const ChannelHandle& channel_handle,
247 Mode mode,
248 Listener* listener,
249 AttachmentBroker* broker) {
250 switch (mode) {
251 case Channel::MODE_CLIENT:
252 return make_scoped_ptr(
253 new ClientChannelMojo(io_runner, channel_handle, listener, broker));
254 case Channel::MODE_SERVER:
255 return make_scoped_ptr(
256 new ServerChannelMojo(io_runner, channel_handle, listener, broker));
257 default:
258 NOTREACHED();
259 return nullptr;
263 // static
264 scoped_ptr<ChannelFactory> ChannelMojo::CreateServerFactory(
265 scoped_refptr<base::TaskRunner> io_runner,
266 const ChannelHandle& channel_handle,
267 AttachmentBroker* broker) {
268 return make_scoped_ptr(new MojoChannelFactory(io_runner, channel_handle,
269 Channel::MODE_SERVER, broker));
272 // static
273 scoped_ptr<ChannelFactory> ChannelMojo::CreateClientFactory(
274 scoped_refptr<base::TaskRunner> io_runner,
275 const ChannelHandle& channel_handle,
276 AttachmentBroker* broker) {
277 return make_scoped_ptr(new MojoChannelFactory(io_runner, channel_handle,
278 Channel::MODE_CLIENT, broker));
281 ChannelMojo::ChannelMojo(scoped_refptr<base::TaskRunner> io_runner,
282 const ChannelHandle& handle,
283 Mode mode,
284 Listener* listener,
285 AttachmentBroker* broker)
286 : listener_(listener),
287 peer_pid_(base::kNullProcessId),
288 io_runner_(io_runner),
289 channel_info_(nullptr, ChannelInfoDeleter(nullptr)),
290 waiting_connect_(true),
291 weak_factory_(this) {
292 // Create MojoBootstrap after all members are set as it touches
293 // ChannelMojo from a different thread.
294 bootstrap_ = MojoBootstrap::Create(handle, mode, this, broker);
295 if (io_runner == base::MessageLoop::current()->task_runner()) {
296 InitOnIOThread();
297 } else {
298 io_runner->PostTask(FROM_HERE, base::Bind(&ChannelMojo::InitOnIOThread,
299 base::Unretained(this)));
303 ChannelMojo::~ChannelMojo() {
304 Close();
307 void ChannelMojo::InitOnIOThread() {
308 ipc_support_.reset(
309 new ScopedIPCSupport(base::MessageLoop::current()->task_runner()));
312 void ChannelMojo::CreateMessagingPipe(
313 mojo::embedder::ScopedPlatformHandle handle,
314 const CreateMessagingPipeCallback& callback) {
315 auto return_callback = base::Bind(&ChannelMojo::OnMessagingPipeCreated,
316 weak_factory_.GetWeakPtr(), callback);
317 if (base::ThreadTaskRunnerHandle::Get() == io_runner_) {
318 CreateMessagingPipeOnIOThread(
319 handle.Pass(), base::ThreadTaskRunnerHandle::Get(), return_callback);
320 } else {
321 io_runner_->PostTask(
322 FROM_HERE,
323 base::Bind(&ChannelMojo::CreateMessagingPipeOnIOThread,
324 base::Passed(&handle), base::ThreadTaskRunnerHandle::Get(),
325 return_callback));
329 // static
330 void ChannelMojo::CreateMessagingPipeOnIOThread(
331 mojo::embedder::ScopedPlatformHandle handle,
332 scoped_refptr<base::TaskRunner> callback_runner,
333 const CreateMessagingPipeOnIOThreadCallback& callback) {
334 mojo::embedder::ChannelInfo* channel_info;
335 mojo::ScopedMessagePipeHandle pipe =
336 mojo::embedder::CreateChannelOnIOThread(handle.Pass(), &channel_info);
337 if (base::ThreadTaskRunnerHandle::Get() == callback_runner) {
338 callback.Run(pipe.Pass(), channel_info);
339 } else {
340 callback_runner->PostTask(
341 FROM_HERE, base::Bind(callback, base::Passed(&pipe), channel_info));
345 void ChannelMojo::OnMessagingPipeCreated(
346 const CreateMessagingPipeCallback& callback,
347 mojo::ScopedMessagePipeHandle handle,
348 mojo::embedder::ChannelInfo* channel_info) {
349 DCHECK(!channel_info_.get());
350 channel_info_ = scoped_ptr<mojo::embedder::ChannelInfo, ChannelInfoDeleter>(
351 channel_info, ChannelInfoDeleter(io_runner_));
352 callback.Run(handle.Pass());
355 bool ChannelMojo::Connect() {
356 DCHECK(!message_reader_);
357 return bootstrap_->Connect();
360 void ChannelMojo::Close() {
361 scoped_ptr<internal::MessagePipeReader, ReaderDeleter> to_be_deleted;
364 // |message_reader_| has to be cleared inside the lock,
365 // but the instance has to be deleted outside.
366 base::AutoLock l(lock_);
367 to_be_deleted = message_reader_.Pass();
368 // We might Close() before we Connect().
369 waiting_connect_ = false;
372 channel_info_.reset();
373 ipc_support_.reset();
374 to_be_deleted.reset();
377 void ChannelMojo::OnBootstrapError() {
378 listener_->OnChannelError();
381 namespace {
383 // ClosingDeleter calls |CloseWithErrorIfPending| before deleting the
384 // |MessagePipeReader|.
385 struct ClosingDeleter {
386 typedef base::DefaultDeleter<internal::MessagePipeReader> DefaultType;
388 void operator()(internal::MessagePipeReader* ptr) const {
389 ptr->CloseWithErrorIfPending();
390 delete ptr;
394 } // namespace
396 void ChannelMojo::InitMessageReader(mojo::ScopedMessagePipeHandle pipe,
397 int32_t peer_pid) {
398 scoped_ptr<internal::MessagePipeReader, ClosingDeleter> reader(
399 new internal::MessagePipeReader(pipe.Pass(), this));
402 base::AutoLock l(lock_);
403 for (size_t i = 0; i < pending_messages_.size(); ++i) {
404 bool sent = reader->Send(make_scoped_ptr(pending_messages_[i]));
405 pending_messages_[i] = nullptr;
406 if (!sent) {
407 // OnChannelError() is notified through ClosingDeleter.
408 pending_messages_.clear();
409 LOG(ERROR) << "Failed to flush pending messages";
410 return;
414 // We set |message_reader_| here and won't get any |pending_messages_|
415 // hereafter. Although we might have some if there is an error, we don't
416 // care. They cannot be sent anyway.
417 message_reader_.reset(reader.release());
418 pending_messages_.clear();
419 waiting_connect_ = false;
422 set_peer_pid(peer_pid);
423 listener_->OnChannelConnected(static_cast<int32_t>(GetPeerPID()));
424 if (message_reader_)
425 message_reader_->ReadMessagesThenWait();
428 void ChannelMojo::OnPipeClosed(internal::MessagePipeReader* reader) {
429 Close();
432 void ChannelMojo::OnPipeError(internal::MessagePipeReader* reader) {
433 listener_->OnChannelError();
437 // Warning: Keep the implementation thread-safe.
438 bool ChannelMojo::Send(Message* message) {
439 base::AutoLock l(lock_);
440 if (!message_reader_) {
441 pending_messages_.push_back(message);
442 // Counts as OK before the connection is established, but it's an
443 // error otherwise.
444 return waiting_connect_;
447 return message_reader_->Send(make_scoped_ptr(message));
450 bool ChannelMojo::IsSendThreadSafe() const {
451 return true;
454 base::ProcessId ChannelMojo::GetPeerPID() const {
455 return peer_pid_;
458 base::ProcessId ChannelMojo::GetSelfPID() const {
459 return bootstrap_->GetSelfPID();
462 void ChannelMojo::OnMessageReceived(Message& message) {
463 TRACE_EVENT2("ipc,toplevel", "ChannelMojo::OnMessageReceived",
464 "class", IPC_MESSAGE_ID_CLASS(message.type()),
465 "line", IPC_MESSAGE_ID_LINE(message.type()));
466 listener_->OnMessageReceived(message);
467 if (message.dispatch_error())
468 listener_->OnBadMessageReceived(message);
471 #if defined(OS_POSIX) && !defined(OS_NACL)
472 int ChannelMojo::GetClientFileDescriptor() const {
473 return bootstrap_->GetClientFileDescriptor();
476 base::ScopedFD ChannelMojo::TakeClientFileDescriptor() {
477 return bootstrap_->TakeClientFileDescriptor();
479 #endif // defined(OS_POSIX) && !defined(OS_NACL)
481 // static
482 MojoResult ChannelMojo::ReadFromMessageAttachmentSet(
483 Message* message,
484 std::vector<MojoHandle>* handles) {
485 // We dup() the handles in IPC::Message to transmit.
486 // IPC::MessageAttachmentSet has intricate lifecycle semantics
487 // of FDs, so just to dup()-and-own them is the safest option.
488 if (message->HasAttachments()) {
489 MessageAttachmentSet* set = message->attachment_set();
490 for (unsigned i = 0; i < set->size(); ++i) {
491 scoped_refptr<MessageAttachment> attachment = set->GetAttachmentAt(i);
492 switch (attachment->GetType()) {
493 case MessageAttachment::TYPE_PLATFORM_FILE:
494 #if defined(OS_POSIX) && !defined(OS_NACL)
496 base::ScopedFD file =
497 TakeOrDupFile(static_cast<IPC::internal::PlatformFileAttachment*>(
498 attachment.get()));
499 if (!file.is_valid()) {
500 DPLOG(WARNING) << "Failed to dup FD to transmit.";
501 set->CommitAll();
502 return MOJO_RESULT_UNKNOWN;
505 MojoHandle wrapped_handle;
506 MojoResult wrap_result = CreatePlatformHandleWrapper(
507 mojo::embedder::ScopedPlatformHandle(
508 mojo::embedder::PlatformHandle(file.release())),
509 &wrapped_handle);
510 if (MOJO_RESULT_OK != wrap_result) {
511 LOG(WARNING) << "Pipe failed to wrap handles. Closing: "
512 << wrap_result;
513 set->CommitAll();
514 return wrap_result;
517 handles->push_back(wrapped_handle);
519 #else
520 NOTREACHED();
521 #endif // defined(OS_POSIX) && !defined(OS_NACL)
522 break;
523 case MessageAttachment::TYPE_MOJO_HANDLE: {
524 mojo::ScopedHandle handle =
525 static_cast<IPC::internal::MojoHandleAttachment*>(
526 attachment.get())->TakeHandle();
527 handles->push_back(handle.release().value());
528 } break;
529 case MessageAttachment::TYPE_BROKERABLE_ATTACHMENT:
530 // Brokerable attachments are handled by the AttachmentBroker so
531 // there's no need to do anything here.
532 NOTREACHED();
533 break;
537 set->CommitAll();
540 return MOJO_RESULT_OK;
543 // static
544 MojoResult ChannelMojo::WriteToMessageAttachmentSet(
545 const std::vector<MojoHandle>& handle_buffer,
546 Message* message) {
547 for (size_t i = 0; i < handle_buffer.size(); ++i) {
548 bool ok = message->attachment_set()->AddAttachment(
549 new IPC::internal::MojoHandleAttachment(
550 mojo::MakeScopedHandle(mojo::Handle(handle_buffer[i]))));
551 DCHECK(ok);
552 if (!ok) {
553 LOG(ERROR) << "Failed to add new Mojo handle.";
554 return MOJO_RESULT_UNKNOWN;
558 return MOJO_RESULT_OK;
561 } // namespace IPC