1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/ipc_channel_proxy.h"
8 #include "base/compiler_specific.h"
9 #include "base/location.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/memory/scoped_ptr.h"
12 #include "base/profiler/scoped_tracker.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "ipc/ipc_channel_factory.h"
16 #include "ipc/ipc_listener.h"
17 #include "ipc/ipc_logging.h"
18 #include "ipc/ipc_message_macros.h"
19 #include "ipc/message_filter.h"
20 #include "ipc/message_filter_router.h"
24 //------------------------------------------------------------------------------
26 ChannelProxy::Context::Context(
28 const scoped_refptr
<base::SingleThreadTaskRunner
>& ipc_task_runner
)
29 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
31 ipc_task_runner_(ipc_task_runner
),
32 channel_connected_called_(false),
33 channel_send_thread_safe_(false),
34 message_filter_router_(new MessageFilterRouter()),
35 peer_pid_(base::kNullProcessId
),
36 attachment_broker_endpoint_(false) {
37 DCHECK(ipc_task_runner_
.get());
38 // The Listener thread where Messages are handled must be a separate thread
39 // to avoid oversubscribing the IO thread. If you trigger this error, you
41 // 1) Create the ChannelProxy on a different thread, or
42 // 2) Just use Channel
43 // Note, we currently make an exception for a NULL listener. That usage
44 // basically works, but is outside the intent of ChannelProxy. This support
45 // will disappear, so please don't rely on it. See crbug.com/364241
46 DCHECK(!listener
|| (ipc_task_runner_
.get() != listener_task_runner_
.get()));
49 ChannelProxy::Context::~Context() {
52 void ChannelProxy::Context::ClearIPCTaskRunner() {
53 ipc_task_runner_
= NULL
;
56 void ChannelProxy::Context::CreateChannel(scoped_ptr
<ChannelFactory
> factory
) {
57 base::AutoLock
l(channel_lifetime_lock_
);
59 channel_id_
= factory
->GetName();
60 channel_
= factory
->BuildChannel(this);
61 channel_send_thread_safe_
= channel_
->IsSendThreadSafe();
62 channel_
->SetAttachmentBrokerEndpoint(attachment_broker_endpoint_
);
65 bool ChannelProxy::Context::TryFilters(const Message
& message
) {
66 DCHECK(message_filter_router_
);
67 #ifdef IPC_MESSAGE_LOG_ENABLED
68 Logging
* logger
= Logging::GetInstance();
69 if (logger
->Enabled())
70 logger
->OnPreDispatchMessage(message
);
73 if (message_filter_router_
->TryFilters(message
)) {
74 if (message
.dispatch_error()) {
75 listener_task_runner_
->PostTask(
76 FROM_HERE
, base::Bind(&Context::OnDispatchBadMessage
, this, message
));
78 #ifdef IPC_MESSAGE_LOG_ENABLED
79 if (logger
->Enabled())
80 logger
->OnPostDispatchMessage(message
, channel_id_
);
87 // Called on the IPC::Channel thread
88 bool ChannelProxy::Context::OnMessageReceived(const Message
& message
) {
89 // First give a chance to the filters to process this message.
90 if (!TryFilters(message
))
91 OnMessageReceivedNoFilter(message
);
95 // Called on the IPC::Channel thread
96 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message
& message
) {
97 listener_task_runner_
->PostTask(
98 FROM_HERE
, base::Bind(&Context::OnDispatchMessage
, this, message
));
102 // Called on the IPC::Channel thread
103 void ChannelProxy::Context::OnChannelConnected(int32_t peer_pid
) {
104 // We cache off the peer_pid so it can be safely accessed from both threads.
105 peer_pid_
= channel_
->GetPeerPID();
107 // Add any pending filters. This avoids a race condition where someone
108 // creates a ChannelProxy, calls AddFilter, and then right after starts the
109 // peer process. The IO thread could receive a message before the task to add
110 // the filter is run on the IO thread.
113 // See above comment about using listener_task_runner_ here.
114 listener_task_runner_
->PostTask(
115 FROM_HERE
, base::Bind(&Context::OnDispatchConnected
, this));
118 // Called on the IPC::Channel thread
119 void ChannelProxy::Context::OnChannelError() {
120 for (size_t i
= 0; i
< filters_
.size(); ++i
)
121 filters_
[i
]->OnChannelError();
123 // See above comment about using listener_task_runner_ here.
124 listener_task_runner_
->PostTask(
125 FROM_HERE
, base::Bind(&Context::OnDispatchError
, this));
128 // Called on the IPC::Channel thread
129 void ChannelProxy::Context::OnChannelOpened() {
130 DCHECK(channel_
!= NULL
);
132 // Assume a reference to ourselves on behalf of this thread. This reference
133 // will be released when we are closed.
136 if (!channel_
->Connect()) {
141 for (size_t i
= 0; i
< filters_
.size(); ++i
)
142 filters_
[i
]->OnFilterAdded(channel_
.get());
145 // Called on the IPC::Channel thread
146 void ChannelProxy::Context::OnChannelClosed() {
147 // TODO(pkasting): Remove ScopedTracker below once crbug.com/477117 is fixed.
148 tracked_objects::ScopedTracker
tracking_profile(
149 FROM_HERE_WITH_EXPLICIT_FUNCTION(
150 "477117 ChannelProxy::Context::OnChannelClosed"));
151 // It's okay for IPC::ChannelProxy::Close to be called more than once, which
152 // would result in this branch being taken.
156 for (size_t i
= 0; i
< filters_
.size(); ++i
) {
157 filters_
[i
]->OnChannelClosing();
158 filters_
[i
]->OnFilterRemoved();
161 // We don't need the filters anymore.
162 message_filter_router_
->Clear();
164 // We don't need the lock, because at this point, the listener thread can't
165 // access it any more.
166 pending_filters_
.clear();
170 // Balance with the reference taken during startup. This may result in
175 void ChannelProxy::Context::Clear() {
179 // Called on the IPC::Channel thread
180 void ChannelProxy::Context::OnSendMessage(scoped_ptr
<Message
> message
) {
181 // TODO(pkasting): Remove ScopedTracker below once crbug.com/477117 is fixed.
182 tracked_objects::ScopedTracker
tracking_profile(
183 FROM_HERE_WITH_EXPLICIT_FUNCTION(
184 "477117 ChannelProxy::Context::OnSendMessage"));
190 if (!channel_
->Send(message
.release()))
194 // Called on the IPC::Channel thread
195 void ChannelProxy::Context::OnAddFilter() {
196 // Our OnChannelConnected method has not yet been called, so we can't be
197 // sure that channel_ is valid yet. When OnChannelConnected *is* called,
198 // it invokes OnAddFilter, so any pending filter(s) will be added at that
200 if (peer_pid_
== base::kNullProcessId
)
203 std::vector
<scoped_refptr
<MessageFilter
> > new_filters
;
205 base::AutoLock
auto_lock(pending_filters_lock_
);
206 new_filters
.swap(pending_filters_
);
209 for (size_t i
= 0; i
< new_filters
.size(); ++i
) {
210 filters_
.push_back(new_filters
[i
]);
212 message_filter_router_
->AddFilter(new_filters
[i
].get());
214 // The channel has already been created and connected, so we need to
215 // inform the filters right now.
216 new_filters
[i
]->OnFilterAdded(channel_
.get());
217 new_filters
[i
]->OnChannelConnected(peer_pid_
);
221 // Called on the IPC::Channel thread
222 void ChannelProxy::Context::OnRemoveFilter(MessageFilter
* filter
) {
223 if (peer_pid_
== base::kNullProcessId
) {
224 // The channel is not yet connected, so any filters are still pending.
225 base::AutoLock
auto_lock(pending_filters_lock_
);
226 for (size_t i
= 0; i
< pending_filters_
.size(); ++i
) {
227 if (pending_filters_
[i
].get() == filter
) {
228 filter
->OnFilterRemoved();
229 pending_filters_
.erase(pending_filters_
.begin() + i
);
236 return; // The filters have already been deleted.
238 message_filter_router_
->RemoveFilter(filter
);
240 for (size_t i
= 0; i
< filters_
.size(); ++i
) {
241 if (filters_
[i
].get() == filter
) {
242 filter
->OnFilterRemoved();
243 filters_
.erase(filters_
.begin() + i
);
248 NOTREACHED() << "filter to be removed not found";
251 // Called on the listener's thread
252 void ChannelProxy::Context::AddFilter(MessageFilter
* filter
) {
253 base::AutoLock
auto_lock(pending_filters_lock_
);
254 pending_filters_
.push_back(make_scoped_refptr(filter
));
255 ipc_task_runner_
->PostTask(
256 FROM_HERE
, base::Bind(&Context::OnAddFilter
, this));
259 // Called on the listener's thread
260 void ChannelProxy::Context::OnDispatchMessage(const Message
& message
) {
261 #if defined(IPC_MESSAGE_LOG_ENABLED)
262 Logging
* logger
= Logging::GetInstance();
264 logger
->GetMessageText(message
.type(), &name
, &message
, NULL
);
265 TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
268 TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
269 "class", IPC_MESSAGE_ID_CLASS(message
.type()),
270 "line", IPC_MESSAGE_ID_LINE(message
.type()));
276 OnDispatchConnected();
278 #ifdef IPC_MESSAGE_LOG_ENABLED
279 if (message
.type() == IPC_LOGGING_ID
) {
280 logger
->OnReceivedLoggingMessage(message
);
284 if (logger
->Enabled())
285 logger
->OnPreDispatchMessage(message
);
288 listener_
->OnMessageReceived(message
);
289 if (message
.dispatch_error())
290 listener_
->OnBadMessageReceived(message
);
292 #ifdef IPC_MESSAGE_LOG_ENABLED
293 if (logger
->Enabled())
294 logger
->OnPostDispatchMessage(message
, channel_id_
);
298 // Called on the listener's thread
299 void ChannelProxy::Context::OnDispatchConnected() {
300 if (channel_connected_called_
)
303 channel_connected_called_
= true;
305 listener_
->OnChannelConnected(peer_pid_
);
308 // Called on the listener's thread
309 void ChannelProxy::Context::OnDispatchError() {
311 listener_
->OnChannelError();
314 // Called on the listener's thread
315 void ChannelProxy::Context::OnDispatchBadMessage(const Message
& message
) {
317 listener_
->OnBadMessageReceived(message
);
320 void ChannelProxy::Context::ClearChannel() {
321 base::AutoLock
l(channel_lifetime_lock_
);
325 void ChannelProxy::Context::SendFromThisThread(Message
* message
) {
326 base::AutoLock
l(channel_lifetime_lock_
);
329 DCHECK(channel_
->IsSendThreadSafe());
330 channel_
->Send(message
);
333 void ChannelProxy::Context::Send(Message
* message
) {
334 if (channel_send_thread_safe_
) {
335 SendFromThisThread(message
);
339 ipc_task_runner()->PostTask(
340 FROM_HERE
, base::Bind(&ChannelProxy::Context::OnSendMessage
, this,
341 base::Passed(scoped_ptr
<Message
>(message
))));
344 bool ChannelProxy::Context::IsChannelSendThreadSafe() const {
345 return channel_send_thread_safe_
;
348 //-----------------------------------------------------------------------------
351 scoped_ptr
<ChannelProxy
> ChannelProxy::Create(
352 const IPC::ChannelHandle
& channel_handle
,
355 const scoped_refptr
<base::SingleThreadTaskRunner
>& ipc_task_runner
,
356 AttachmentBroker
* broker
) {
357 scoped_ptr
<ChannelProxy
> channel(new ChannelProxy(listener
, ipc_task_runner
));
358 channel
->Init(channel_handle
, mode
, true, broker
);
359 return channel
.Pass();
363 scoped_ptr
<ChannelProxy
> ChannelProxy::Create(
364 scoped_ptr
<ChannelFactory
> factory
,
366 const scoped_refptr
<base::SingleThreadTaskRunner
>& ipc_task_runner
) {
367 scoped_ptr
<ChannelProxy
> channel(new ChannelProxy(listener
, ipc_task_runner
));
368 channel
->Init(factory
.Pass(), true);
369 return channel
.Pass();
372 ChannelProxy::ChannelProxy(Context
* context
)
375 #if defined(ENABLE_IPC_FUZZER)
376 outgoing_message_filter_
= NULL
;
380 ChannelProxy::ChannelProxy(
382 const scoped_refptr
<base::SingleThreadTaskRunner
>& ipc_task_runner
)
383 : context_(new Context(listener
, ipc_task_runner
)), did_init_(false) {
384 #if defined(ENABLE_IPC_FUZZER)
385 outgoing_message_filter_
= NULL
;
389 ChannelProxy::~ChannelProxy() {
390 DCHECK(CalledOnValidThread());
395 void ChannelProxy::Init(const IPC::ChannelHandle
& channel_handle
,
397 bool create_pipe_now
,
398 AttachmentBroker
* broker
) {
399 #if defined(OS_POSIX)
400 // When we are creating a server on POSIX, we need its file descriptor
401 // to be created immediately so that it can be accessed and passed
402 // to other processes. Forcing it to be created immediately avoids
403 // race conditions that may otherwise arise.
404 if (mode
& Channel::MODE_SERVER_FLAG
) {
405 create_pipe_now
= true;
407 #endif // defined(OS_POSIX)
408 Init(ChannelFactory::Create(channel_handle
, mode
, broker
), create_pipe_now
);
411 void ChannelProxy::Init(scoped_ptr
<ChannelFactory
> factory
,
412 bool create_pipe_now
) {
413 DCHECK(CalledOnValidThread());
416 if (create_pipe_now
) {
417 // Create the channel immediately. This effectively sets up the
418 // low-level pipe so that the client can connect. Without creating
419 // the pipe immediately, it is possible for a listener to attempt
420 // to connect and get an error since the pipe doesn't exist yet.
421 context_
->CreateChannel(factory
.Pass());
423 context_
->ipc_task_runner()->PostTask(
424 FROM_HERE
, base::Bind(&Context::CreateChannel
,
425 context_
.get(), Passed(factory
.Pass())));
428 // complete initialization on the background thread
429 context_
->ipc_task_runner()->PostTask(
430 FROM_HERE
, base::Bind(&Context::OnChannelOpened
, context_
.get()));
436 void ChannelProxy::Close() {
437 DCHECK(CalledOnValidThread());
439 // Clear the backpointer to the listener so that any pending calls to
440 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is
441 // possible that the channel could be closed while it is receiving messages!
444 if (context_
->ipc_task_runner()) {
445 context_
->ipc_task_runner()->PostTask(
446 FROM_HERE
, base::Bind(&Context::OnChannelClosed
, context_
.get()));
450 bool ChannelProxy::Send(Message
* message
) {
453 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
454 // tests that call Send() from a wrong thread. See http://crbug.com/163523.
456 #ifdef ENABLE_IPC_FUZZER
457 // In IPC fuzzing builds, it is possible to define a filter to apply to
458 // outgoing messages. It will either rewrite the message and return a new
459 // one, freeing the original, or return the message unchanged.
460 if (outgoing_message_filter())
461 message
= outgoing_message_filter()->Rewrite(message
);
464 #ifdef IPC_MESSAGE_LOG_ENABLED
465 Logging::GetInstance()->OnSendMessage(message
, context_
->channel_id());
468 context_
->Send(message
);
472 void ChannelProxy::AddFilter(MessageFilter
* filter
) {
473 DCHECK(CalledOnValidThread());
475 context_
->AddFilter(filter
);
478 void ChannelProxy::RemoveFilter(MessageFilter
* filter
) {
479 DCHECK(CalledOnValidThread());
481 context_
->ipc_task_runner()->PostTask(
482 FROM_HERE
, base::Bind(&Context::OnRemoveFilter
, context_
.get(),
483 make_scoped_refptr(filter
)));
486 void ChannelProxy::ClearIPCTaskRunner() {
487 DCHECK(CalledOnValidThread());
489 context()->ClearIPCTaskRunner();
492 base::ProcessId
ChannelProxy::GetPeerPID() const {
493 return context_
->peer_pid_
;
496 void ChannelProxy::OnSetAttachmentBrokerEndpoint() {
497 context()->set_attachment_broker_endpoint(is_attachment_broker_endpoint());
500 #if defined(OS_POSIX) && !defined(OS_NACL_SFI)
501 // See the TODO regarding lazy initialization of the channel in
502 // ChannelProxy::Init().
503 int ChannelProxy::GetClientFileDescriptor() {
504 DCHECK(CalledOnValidThread());
506 Channel
* channel
= context_
.get()->channel_
.get();
507 // Channel must have been created first.
508 DCHECK(channel
) << context_
.get()->channel_id_
;
509 return channel
->GetClientFileDescriptor();
512 base::ScopedFD
ChannelProxy::TakeClientFileDescriptor() {
513 DCHECK(CalledOnValidThread());
515 Channel
* channel
= context_
.get()->channel_
.get();
516 // Channel must have been created first.
517 DCHECK(channel
) << context_
.get()->channel_id_
;
518 return channel
->TakeClientFileDescriptor();
522 void ChannelProxy::OnChannelInit() {
525 //-----------------------------------------------------------------------------