cc: Add an ACTION_DRAW_AND_SWAP_ABORT to help clear the pipeline
[chromium-blink-merge.git] / ipc / ipc_channel_proxy.cc
blob45512f97e1813e916d54f09fdfaac1279a82fd15
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/bind.h"
6 #include "base/compiler_specific.h"
7 #include "base/debug/trace_event.h"
8 #include "base/location.h"
9 #include "base/memory/ref_counted.h"
10 #include "base/memory/scoped_ptr.h"
11 #include "base/single_thread_task_runner.h"
12 #include "base/thread_task_runner_handle.h"
13 #include "ipc/ipc_channel_proxy.h"
14 #include "ipc/ipc_listener.h"
15 #include "ipc/ipc_logging.h"
16 #include "ipc/ipc_message_macros.h"
17 #include "ipc/ipc_message_utils.h"
19 namespace IPC {
21 //------------------------------------------------------------------------------
23 ChannelProxy::MessageFilter::MessageFilter() {}
25 void ChannelProxy::MessageFilter::OnFilterAdded(Channel* channel) {}
27 void ChannelProxy::MessageFilter::OnFilterRemoved() {}
29 void ChannelProxy::MessageFilter::OnChannelConnected(int32 peer_pid) {}
31 void ChannelProxy::MessageFilter::OnChannelError() {}
33 void ChannelProxy::MessageFilter::OnChannelClosing() {}
35 bool ChannelProxy::MessageFilter::OnMessageReceived(const Message& message) {
36 return false;
39 void ChannelProxy::MessageFilter::OnDestruct() const {
40 delete this;
43 ChannelProxy::MessageFilter::~MessageFilter() {}
45 //------------------------------------------------------------------------------
47 ChannelProxy::Context::Context(Listener* listener,
48 base::SingleThreadTaskRunner* ipc_task_runner)
49 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
50 listener_(listener),
51 ipc_task_runner_(ipc_task_runner),
52 channel_connected_called_(false),
53 peer_pid_(base::kNullProcessId) {
54 DCHECK(ipc_task_runner_.get());
57 ChannelProxy::Context::~Context() {
60 void ChannelProxy::Context::ClearIPCTaskRunner() {
61 ipc_task_runner_ = NULL;
64 void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle,
65 const Channel::Mode& mode) {
66 DCHECK(channel_.get() == NULL);
67 channel_id_ = handle.name;
68 channel_.reset(new Channel(handle, mode, this));
71 bool ChannelProxy::Context::TryFilters(const Message& message) {
72 #ifdef IPC_MESSAGE_LOG_ENABLED
73 Logging* logger = Logging::GetInstance();
74 if (logger->Enabled())
75 logger->OnPreDispatchMessage(message);
76 #endif
78 for (size_t i = 0; i < filters_.size(); ++i) {
79 if (filters_[i]->OnMessageReceived(message)) {
80 #ifdef IPC_MESSAGE_LOG_ENABLED
81 if (logger->Enabled())
82 logger->OnPostDispatchMessage(message, channel_id_);
83 #endif
84 return true;
87 return false;
90 // Called on the IPC::Channel thread
91 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
92 // First give a chance to the filters to process this message.
93 if (!TryFilters(message))
94 OnMessageReceivedNoFilter(message);
95 return true;
98 // Called on the IPC::Channel thread
99 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
100 // NOTE: This code relies on the listener's message loop not going away while
101 // this thread is active. That should be a reasonable assumption, but it
102 // feels risky. We may want to invent some more indirect way of referring to
103 // a MessageLoop if this becomes a problem.
104 listener_task_runner_->PostTask(
105 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
106 return true;
109 // Called on the IPC::Channel thread
110 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
111 // Add any pending filters. This avoids a race condition where someone
112 // creates a ChannelProxy, calls AddFilter, and then right after starts the
113 // peer process. The IO thread could receive a message before the task to add
114 // the filter is run on the IO thread.
115 OnAddFilter();
117 // We cache off the peer_pid so it can be safely accessed from both threads.
118 peer_pid_ = channel_->peer_pid();
119 for (size_t i = 0; i < filters_.size(); ++i)
120 filters_[i]->OnChannelConnected(peer_pid);
122 // See above comment about using listener_task_runner_ here.
123 listener_task_runner_->PostTask(
124 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
127 // Called on the IPC::Channel thread
128 void ChannelProxy::Context::OnChannelError() {
129 for (size_t i = 0; i < filters_.size(); ++i)
130 filters_[i]->OnChannelError();
132 // See above comment about using listener_task_runner_ here.
133 listener_task_runner_->PostTask(
134 FROM_HERE, base::Bind(&Context::OnDispatchError, this));
137 // Called on the IPC::Channel thread
138 void ChannelProxy::Context::OnChannelOpened() {
139 DCHECK(channel_ != NULL);
141 // Assume a reference to ourselves on behalf of this thread. This reference
142 // will be released when we are closed.
143 AddRef();
145 if (!channel_->Connect()) {
146 OnChannelError();
147 return;
150 for (size_t i = 0; i < filters_.size(); ++i)
151 filters_[i]->OnFilterAdded(channel_.get());
154 // Called on the IPC::Channel thread
155 void ChannelProxy::Context::OnChannelClosed() {
156 // It's okay for IPC::ChannelProxy::Close to be called more than once, which
157 // would result in this branch being taken.
158 if (!channel_.get())
159 return;
161 for (size_t i = 0; i < filters_.size(); ++i) {
162 filters_[i]->OnChannelClosing();
163 filters_[i]->OnFilterRemoved();
166 // We don't need the filters anymore.
167 filters_.clear();
169 channel_.reset();
171 // Balance with the reference taken during startup. This may result in
172 // self-destruction.
173 Release();
176 void ChannelProxy::Context::Clear() {
177 listener_ = NULL;
180 // Called on the IPC::Channel thread
181 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
182 if (!channel_.get()) {
183 OnChannelClosed();
184 return;
186 if (!channel_->Send(message.release()))
187 OnChannelError();
190 // Called on the IPC::Channel thread
191 void ChannelProxy::Context::OnAddFilter() {
192 std::vector<scoped_refptr<MessageFilter> > new_filters;
194 base::AutoLock auto_lock(pending_filters_lock_);
195 new_filters.swap(pending_filters_);
198 for (size_t i = 0; i < new_filters.size(); ++i) {
199 filters_.push_back(new_filters[i]);
201 // If the channel has already been created, then we need to send this
202 // message so that the filter gets access to the Channel.
203 if (channel_.get())
204 new_filters[i]->OnFilterAdded(channel_.get());
205 // Ditto for if the channel has been connected.
206 if (peer_pid_)
207 new_filters[i]->OnChannelConnected(peer_pid_);
211 // Called on the IPC::Channel thread
212 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
213 if (!channel_.get())
214 return; // The filters have already been deleted.
216 for (size_t i = 0; i < filters_.size(); ++i) {
217 if (filters_[i].get() == filter) {
218 filter->OnFilterRemoved();
219 filters_.erase(filters_.begin() + i);
220 return;
224 NOTREACHED() << "filter to be removed not found";
227 // Called on the listener's thread
228 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
229 base::AutoLock auto_lock(pending_filters_lock_);
230 pending_filters_.push_back(make_scoped_refptr(filter));
231 ipc_task_runner_->PostTask(
232 FROM_HERE, base::Bind(&Context::OnAddFilter, this));
235 // Called on the listener's thread
236 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
237 #ifdef IPC_MESSAGE_LOG_ENABLED
238 Logging* logger = Logging::GetInstance();
239 std::string name;
240 logger->GetMessageText(message.type(), &name, &message, NULL);
241 TRACE_EVENT1("task", "ChannelProxy::Context::OnDispatchMessage",
242 "name", name);
243 #else
244 TRACE_EVENT2("task", "ChannelProxy::Context::OnDispatchMessage",
245 "class", IPC_MESSAGE_ID_CLASS(message.type()),
246 "line", IPC_MESSAGE_ID_LINE(message.type()));
247 #endif
249 if (!listener_)
250 return;
252 OnDispatchConnected();
254 #ifdef IPC_MESSAGE_LOG_ENABLED
255 if (message.type() == IPC_LOGGING_ID) {
256 logger->OnReceivedLoggingMessage(message);
257 return;
260 if (logger->Enabled())
261 logger->OnPreDispatchMessage(message);
262 #endif
264 listener_->OnMessageReceived(message);
266 #ifdef IPC_MESSAGE_LOG_ENABLED
267 if (logger->Enabled())
268 logger->OnPostDispatchMessage(message, channel_id_);
269 #endif
272 // Called on the listener's thread
273 void ChannelProxy::Context::OnDispatchConnected() {
274 if (channel_connected_called_)
275 return;
277 channel_connected_called_ = true;
278 if (listener_)
279 listener_->OnChannelConnected(peer_pid_);
282 // Called on the listener's thread
283 void ChannelProxy::Context::OnDispatchError() {
284 if (listener_)
285 listener_->OnChannelError();
288 //-----------------------------------------------------------------------------
290 ChannelProxy::ChannelProxy(const IPC::ChannelHandle& channel_handle,
291 Channel::Mode mode,
292 Listener* listener,
293 base::SingleThreadTaskRunner* ipc_task_runner)
294 : context_(new Context(listener, ipc_task_runner)),
295 outgoing_message_filter_(NULL),
296 did_init_(false) {
297 Init(channel_handle, mode, true);
300 ChannelProxy::ChannelProxy(Context* context)
301 : context_(context),
302 outgoing_message_filter_(NULL),
303 did_init_(false) {
306 ChannelProxy::~ChannelProxy() {
307 DCHECK(CalledOnValidThread());
309 Close();
312 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
313 Channel::Mode mode,
314 bool create_pipe_now) {
315 DCHECK(CalledOnValidThread());
316 DCHECK(!did_init_);
317 #if defined(OS_POSIX)
318 // When we are creating a server on POSIX, we need its file descriptor
319 // to be created immediately so that it can be accessed and passed
320 // to other processes. Forcing it to be created immediately avoids
321 // race conditions that may otherwise arise.
322 if (mode & Channel::MODE_SERVER_FLAG) {
323 create_pipe_now = true;
325 #endif // defined(OS_POSIX)
327 if (create_pipe_now) {
328 // Create the channel immediately. This effectively sets up the
329 // low-level pipe so that the client can connect. Without creating
330 // the pipe immediately, it is possible for a listener to attempt
331 // to connect and get an error since the pipe doesn't exist yet.
332 context_->CreateChannel(channel_handle, mode);
333 } else {
334 context_->ipc_task_runner()->PostTask(
335 FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(),
336 channel_handle, mode));
339 // complete initialization on the background thread
340 context_->ipc_task_runner()->PostTask(
341 FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
343 did_init_ = true;
346 void ChannelProxy::Close() {
347 DCHECK(CalledOnValidThread());
349 // Clear the backpointer to the listener so that any pending calls to
350 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is
351 // possible that the channel could be closed while it is receiving messages!
352 context_->Clear();
354 if (context_->ipc_task_runner()) {
355 context_->ipc_task_runner()->PostTask(
356 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
360 bool ChannelProxy::Send(Message* message) {
361 DCHECK(did_init_);
363 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
364 // tests that call Send() from a wrong thread. See http://crbug.com/163523.
365 if (outgoing_message_filter())
366 message = outgoing_message_filter()->Rewrite(message);
368 #ifdef IPC_MESSAGE_LOG_ENABLED
369 Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
370 #endif
372 context_->ipc_task_runner()->PostTask(
373 FROM_HERE,
374 base::Bind(&ChannelProxy::Context::OnSendMessage,
375 context_, base::Passed(scoped_ptr<Message>(message))));
376 return true;
379 void ChannelProxy::AddFilter(MessageFilter* filter) {
380 DCHECK(CalledOnValidThread());
382 context_->AddFilter(filter);
385 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
386 DCHECK(CalledOnValidThread());
388 context_->ipc_task_runner()->PostTask(
389 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
390 make_scoped_refptr(filter)));
393 void ChannelProxy::ClearIPCTaskRunner() {
394 DCHECK(CalledOnValidThread());
396 context()->ClearIPCTaskRunner();
399 #if defined(OS_POSIX) && !defined(OS_NACL)
400 // See the TODO regarding lazy initialization of the channel in
401 // ChannelProxy::Init().
402 int ChannelProxy::GetClientFileDescriptor() {
403 DCHECK(CalledOnValidThread());
405 Channel* channel = context_.get()->channel_.get();
406 // Channel must have been created first.
407 DCHECK(channel) << context_.get()->channel_id_;
408 return channel->GetClientFileDescriptor();
411 int ChannelProxy::TakeClientFileDescriptor() {
412 DCHECK(CalledOnValidThread());
414 Channel* channel = context_.get()->channel_.get();
415 // Channel must have been created first.
416 DCHECK(channel) << context_.get()->channel_id_;
417 return channel->TakeClientFileDescriptor();
420 bool ChannelProxy::GetPeerEuid(uid_t* peer_euid) const {
421 DCHECK(CalledOnValidThread());
423 Channel* channel = context_.get()->channel_.get();
424 // Channel must have been created first.
425 DCHECK(channel) << context_.get()->channel_id_;
426 return channel->GetPeerEuid(peer_euid);
428 #endif
430 //-----------------------------------------------------------------------------
432 } // namespace IPC