1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/edk/system/raw_channel.h"
14 #include "base/bind.h"
15 #include "base/location.h"
16 #include "base/logging.h"
17 #include "base/macros.h"
18 #include "base/memory/scoped_ptr.h"
19 #include "base/memory/weak_ptr.h"
20 #include "base/message_loop/message_loop.h"
21 #include "base/synchronization/lock.h"
22 #include "mojo/edk/embedder/platform_channel_utils_posix.h"
23 #include "mojo/edk/embedder/platform_handle.h"
24 #include "mojo/edk/embedder/platform_handle_vector.h"
25 #include "mojo/edk/system/transport_data.h"
32 class RawChannelPosix
: public RawChannel
,
33 public base::MessageLoopForIO::Watcher
{
35 explicit RawChannelPosix(embedder::ScopedPlatformHandle handle
);
36 ~RawChannelPosix() override
;
38 // |RawChannel| public methods:
39 size_t GetSerializedPlatformHandleSize() const override
;
42 // |RawChannel| protected methods:
43 // Actually override this so that we can send multiple messages with (only)
45 void EnqueueMessageNoLock(scoped_ptr
<MessageInTransit
> message
) override
;
46 // Override this to handle those extra FD-only messages.
47 bool OnReadMessageForRawChannel(
48 const MessageInTransit::View
& message_view
) override
;
49 IOResult
Read(size_t* bytes_read
) override
;
50 IOResult
ScheduleRead() override
;
51 embedder::ScopedPlatformHandleVectorPtr
GetReadPlatformHandles(
52 size_t num_platform_handles
,
53 const void* platform_handle_table
) override
;
54 IOResult
WriteNoLock(size_t* platform_handles_written
,
55 size_t* bytes_written
) override
;
56 IOResult
ScheduleWriteNoLock() override
;
57 bool OnInit() override
;
58 void OnShutdownNoLock(scoped_ptr
<ReadBuffer
> read_buffer
,
59 scoped_ptr
<WriteBuffer
> write_buffer
) override
;
61 // |base::MessageLoopForIO::Watcher| implementation:
62 void OnFileCanReadWithoutBlocking(int fd
) override
;
63 void OnFileCanWriteWithoutBlocking(int fd
) override
;
65 // Implements most of |Read()| (except for a bit of clean-up):
66 IOResult
ReadImpl(size_t* bytes_read
);
68 // Watches for |fd_| to become writable. Must be called on the I/O thread.
71 embedder::ScopedPlatformHandle fd_
;
73 // The following members are only used on the I/O thread:
74 scoped_ptr
<base::MessageLoopForIO::FileDescriptorWatcher
> read_watcher_
;
75 scoped_ptr
<base::MessageLoopForIO::FileDescriptorWatcher
> write_watcher_
;
79 std::deque
<embedder::PlatformHandle
> read_platform_handles_
;
81 // The following members are used on multiple threads and protected by
85 // This is used for posting tasks from write threads to the I/O thread. It
86 // must only be accessed under |write_lock_|. The weak pointers it produces
87 // are only used/invalidated on the I/O thread.
88 base::WeakPtrFactory
<RawChannelPosix
> weak_ptr_factory_
;
90 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix
);
93 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle
)
96 pending_write_(false),
97 weak_ptr_factory_(this) {
98 DCHECK(fd_
.is_valid());
101 RawChannelPosix::~RawChannelPosix() {
102 DCHECK(!pending_read_
);
103 DCHECK(!pending_write_
);
105 // No need to take the |write_lock()| here -- if there are still weak pointers
106 // outstanding, then we're hosed anyway (since we wouldn't be able to
107 // invalidate them cleanly, since we might not be on the I/O thread).
108 DCHECK(!weak_ptr_factory_
.HasWeakPtrs());
110 // These must have been shut down/destroyed on the I/O thread.
111 DCHECK(!read_watcher_
);
112 DCHECK(!write_watcher_
);
114 embedder::CloseAllPlatformHandles(&read_platform_handles_
);
117 size_t RawChannelPosix::GetSerializedPlatformHandleSize() const {
118 // We don't actually need any space on POSIX (since we just send FDs).
122 void RawChannelPosix::EnqueueMessageNoLock(
123 scoped_ptr
<MessageInTransit
> message
) {
124 if (message
->transport_data()) {
125 embedder::PlatformHandleVector
* const platform_handles
=
126 message
->transport_data()->platform_handles();
127 if (platform_handles
&&
128 platform_handles
->size() > embedder::kPlatformChannelMaxNumHandles
) {
129 // We can't attach all the FDs to a single message, so we have to "split"
130 // the message. Send as many control messages as needed first with FDs
131 // attached (and no data).
133 for (; platform_handles
->size() - i
>
134 embedder::kPlatformChannelMaxNumHandles
;
135 i
+= embedder::kPlatformChannelMaxNumHandles
) {
136 scoped_ptr
<MessageInTransit
> fd_message(new MessageInTransit(
137 MessageInTransit::kTypeRawChannel
,
138 MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles
,
141 embedder::ScopedPlatformHandleVectorPtr
fds(
142 new embedder::PlatformHandleVector(
143 platform_handles
->begin() + i
,
144 platform_handles
->begin() + i
+
145 embedder::kPlatformChannelMaxNumHandles
));
146 fd_message
->SetTransportData(
147 make_scoped_ptr(new TransportData(fds
.Pass())));
148 RawChannel::EnqueueMessageNoLock(fd_message
.Pass());
151 // Remove the handles that we "moved" into the other messages.
152 platform_handles
->erase(platform_handles
->begin(),
153 platform_handles
->begin() + i
);
157 RawChannel::EnqueueMessageNoLock(message
.Pass());
160 bool RawChannelPosix::OnReadMessageForRawChannel(
161 const MessageInTransit::View
& message_view
) {
162 DCHECK_EQ(message_view
.type(), MessageInTransit::kTypeRawChannel
);
164 if (message_view
.subtype() ==
165 MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles
) {
166 // We don't need to do anything. |RawChannel| won't extract the platform
167 // handles, and they'll be accumulated in |Read()|.
171 return RawChannel::OnReadMessageForRawChannel(message_view
);
174 RawChannel::IOResult
RawChannelPosix::Read(size_t* bytes_read
) {
175 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
176 DCHECK(!pending_read_
);
178 IOResult rv
= ReadImpl(bytes_read
);
179 if (rv
!= IO_SUCCEEDED
&& rv
!= IO_PENDING
) {
180 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again.
181 read_watcher_
.reset();
186 RawChannel::IOResult
RawChannelPosix::ScheduleRead() {
187 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
188 DCHECK(!pending_read_
);
190 pending_read_
= true;
195 embedder::ScopedPlatformHandleVectorPtr
RawChannelPosix::GetReadPlatformHandles(
196 size_t num_platform_handles
,
197 const void* /*platform_handle_table*/) {
198 DCHECK_GT(num_platform_handles
, 0u);
200 if (read_platform_handles_
.size() < num_platform_handles
) {
201 embedder::CloseAllPlatformHandles(&read_platform_handles_
);
202 read_platform_handles_
.clear();
203 return embedder::ScopedPlatformHandleVectorPtr();
206 embedder::ScopedPlatformHandleVectorPtr
rv(
207 new embedder::PlatformHandleVector(num_platform_handles
));
208 rv
->assign(read_platform_handles_
.begin(),
209 read_platform_handles_
.begin() + num_platform_handles
);
210 read_platform_handles_
.erase(
211 read_platform_handles_
.begin(),
212 read_platform_handles_
.begin() + num_platform_handles
);
216 RawChannel::IOResult
RawChannelPosix::WriteNoLock(
217 size_t* platform_handles_written
,
218 size_t* bytes_written
) {
219 write_lock().AssertAcquired();
221 DCHECK(!pending_write_
);
223 size_t num_platform_handles
= 0;
224 ssize_t write_result
;
225 if (write_buffer_no_lock()->HavePlatformHandlesToSend()) {
226 embedder::PlatformHandle
* platform_handles
;
227 void* serialization_data
; // Actually unused.
228 write_buffer_no_lock()->GetPlatformHandlesToSend(
229 &num_platform_handles
, &platform_handles
, &serialization_data
);
230 DCHECK_GT(num_platform_handles
, 0u);
231 DCHECK_LE(num_platform_handles
, embedder::kPlatformChannelMaxNumHandles
);
232 DCHECK(platform_handles
);
234 // TODO(vtl): Reduce code duplication. (This is duplicated from below.)
235 std::vector
<WriteBuffer::Buffer
> buffers
;
236 write_buffer_no_lock()->GetBuffers(&buffers
);
237 DCHECK(!buffers
.empty());
238 const size_t kMaxBufferCount
= 10;
239 iovec iov
[kMaxBufferCount
];
240 size_t buffer_count
= std::min(buffers
.size(), kMaxBufferCount
);
241 for (size_t i
= 0; i
< buffer_count
; ++i
) {
242 iov
[i
].iov_base
= const_cast<char*>(buffers
[i
].addr
);
243 iov
[i
].iov_len
= buffers
[i
].size
;
246 write_result
= embedder::PlatformChannelSendmsgWithHandles(
247 fd_
.get(), iov
, buffer_count
, platform_handles
, num_platform_handles
);
248 for (size_t i
= 0; i
< num_platform_handles
; i
++)
249 platform_handles
[i
].CloseIfNecessary();
251 std::vector
<WriteBuffer::Buffer
> buffers
;
252 write_buffer_no_lock()->GetBuffers(&buffers
);
253 DCHECK(!buffers
.empty());
255 if (buffers
.size() == 1) {
256 write_result
= embedder::PlatformChannelWrite(
257 fd_
.get(), buffers
[0].addr
, buffers
[0].size
);
259 const size_t kMaxBufferCount
= 10;
260 iovec iov
[kMaxBufferCount
];
261 size_t buffer_count
= std::min(buffers
.size(), kMaxBufferCount
);
262 for (size_t i
= 0; i
< buffer_count
; ++i
) {
263 iov
[i
].iov_base
= const_cast<char*>(buffers
[i
].addr
);
264 iov
[i
].iov_len
= buffers
[i
].size
;
268 embedder::PlatformChannelWritev(fd_
.get(), iov
, buffer_count
);
272 if (write_result
>= 0) {
273 *platform_handles_written
= num_platform_handles
;
274 *bytes_written
= static_cast<size_t>(write_result
);
279 return IO_FAILED_SHUTDOWN
;
281 if (errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) {
282 PLOG(WARNING
) << "sendmsg/write/writev";
283 return IO_FAILED_UNKNOWN
;
286 return ScheduleWriteNoLock();
289 RawChannel::IOResult
RawChannelPosix::ScheduleWriteNoLock() {
290 write_lock().AssertAcquired();
292 DCHECK(!pending_write_
);
294 // Set up to wait for the FD to become writable.
295 // If we're not on the I/O thread, we have to post a task to do this.
296 if (base::MessageLoop::current() != message_loop_for_io()) {
297 message_loop_for_io()->PostTask(FROM_HERE
,
298 base::Bind(&RawChannelPosix::WaitToWrite
,
299 weak_ptr_factory_
.GetWeakPtr()));
300 pending_write_
= true;
304 if (message_loop_for_io()->WatchFileDescriptor(
307 base::MessageLoopForIO::WATCH_WRITE
,
308 write_watcher_
.get(),
310 pending_write_
= true;
314 return IO_FAILED_UNKNOWN
;
317 bool RawChannelPosix::OnInit() {
318 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
320 DCHECK(!read_watcher_
);
321 read_watcher_
.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
322 DCHECK(!write_watcher_
);
323 write_watcher_
.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
325 if (!message_loop_for_io()->WatchFileDescriptor(
328 base::MessageLoopForIO::WATCH_READ
,
331 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
332 // (in the sense of returning the message loop's state to what it was before
334 read_watcher_
.reset();
335 write_watcher_
.reset();
342 void RawChannelPosix::OnShutdownNoLock(
343 scoped_ptr
<ReadBuffer
> /*read_buffer*/,
344 scoped_ptr
<WriteBuffer
> /*write_buffer*/) {
345 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
346 write_lock().AssertAcquired();
348 read_watcher_
.reset(); // This will stop watching (if necessary).
349 write_watcher_
.reset(); // This will stop watching (if necessary).
351 pending_read_
= false;
352 pending_write_
= false;
354 DCHECK(fd_
.is_valid());
357 weak_ptr_factory_
.InvalidateWeakPtrs();
360 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd
) {
361 DCHECK_EQ(fd
, fd_
.get().fd
);
362 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
364 if (!pending_read_
) {
369 pending_read_
= false;
370 size_t bytes_read
= 0;
371 IOResult io_result
= Read(&bytes_read
);
372 if (io_result
!= IO_PENDING
)
373 OnReadCompleted(io_result
, bytes_read
);
375 // On failure, |read_watcher_| must have been reset; on success,
376 // we assume that |OnReadCompleted()| always schedules another read.
377 // Otherwise, we could end up spinning -- getting
378 // |OnFileCanReadWithoutBlocking()| again and again but not doing any actual
380 // TODO(yzshen): An alternative is to stop watching if RawChannel doesn't
381 // schedule a new read. But that code won't be reached under the current
382 // RawChannel implementation.
383 DCHECK(!read_watcher_
|| pending_read_
);
386 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd
) {
387 DCHECK_EQ(fd
, fd_
.get().fd
);
388 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
391 size_t platform_handles_written
= 0;
392 size_t bytes_written
= 0;
394 base::AutoLock
locker(write_lock());
396 DCHECK(pending_write_
);
398 pending_write_
= false;
399 io_result
= WriteNoLock(&platform_handles_written
, &bytes_written
);
402 if (io_result
!= IO_PENDING
)
403 OnWriteCompleted(io_result
, platform_handles_written
, bytes_written
);
406 RawChannel::IOResult
RawChannelPosix::ReadImpl(size_t* bytes_read
) {
407 char* buffer
= nullptr;
408 size_t bytes_to_read
= 0;
409 read_buffer()->GetBuffer(&buffer
, &bytes_to_read
);
411 size_t old_num_platform_handles
= read_platform_handles_
.size();
412 ssize_t read_result
= embedder::PlatformChannelRecvmsg(
413 fd_
.get(), buffer
, bytes_to_read
, &read_platform_handles_
);
414 if (read_platform_handles_
.size() > old_num_platform_handles
) {
415 DCHECK_LE(read_platform_handles_
.size() - old_num_platform_handles
,
416 embedder::kPlatformChannelMaxNumHandles
);
418 // We should never accumulate more than |TransportData::kMaxPlatformHandles
419 // + embedder::kPlatformChannelMaxNumHandles| handles. (The latter part is
420 // possible because we could have accumulated all the handles for a message,
421 // then received the message data plus the first set of handles for the next
422 // message in the subsequent |recvmsg()|.)
423 if (read_platform_handles_
.size() >
424 (TransportData::kMaxPlatformHandles
+
425 embedder::kPlatformChannelMaxNumHandles
)) {
426 LOG(ERROR
) << "Received too many platform handles";
427 embedder::CloseAllPlatformHandles(&read_platform_handles_
);
428 read_platform_handles_
.clear();
429 return IO_FAILED_UNKNOWN
;
433 if (read_result
> 0) {
434 *bytes_read
= static_cast<size_t>(read_result
);
438 // |read_result == 0| means "end of file".
439 if (read_result
== 0)
440 return IO_FAILED_SHUTDOWN
;
442 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)
443 return ScheduleRead();
445 if (errno
== ECONNRESET
)
446 return IO_FAILED_BROKEN
;
448 PLOG(WARNING
) << "recvmsg";
449 return IO_FAILED_UNKNOWN
;
452 void RawChannelPosix::WaitToWrite() {
453 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
455 DCHECK(write_watcher_
);
457 if (!message_loop_for_io()->WatchFileDescriptor(
460 base::MessageLoopForIO::WATCH_WRITE
,
461 write_watcher_
.get(),
464 base::AutoLock
locker(write_lock());
466 DCHECK(pending_write_
);
467 pending_write_
= false;
469 OnWriteCompleted(IO_FAILED_UNKNOWN
, 0, 0);
475 // -----------------------------------------------------------------------------
477 // Static factory method declared in raw_channel.h.
479 scoped_ptr
<RawChannel
> RawChannel::Create(
480 embedder::ScopedPlatformHandle handle
) {
481 return make_scoped_ptr(new RawChannelPosix(handle
.Pass()));
484 } // namespace system