net: Add SetUploadStream method to URLFetcher.
[chromium-blink-merge.git] / ipc / ipc_channel_proxy.cc
blob37be200abc7a6ba254961ca23fd676ee580e27d1
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/ipc_channel_proxy.h"
7 #include "base/bind.h"
8 #include "base/compiler_specific.h"
9 #include "base/location.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/memory/scoped_ptr.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/thread_task_runner_handle.h"
14 #include "ipc/ipc_channel_factory.h"
15 #include "ipc/ipc_listener.h"
16 #include "ipc/ipc_logging.h"
17 #include "ipc/ipc_message_macros.h"
18 #include "ipc/message_filter.h"
19 #include "ipc/message_filter_router.h"
21 namespace IPC {
23 //------------------------------------------------------------------------------
25 ChannelProxy::Context::Context(
26 Listener* listener,
27 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
28 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
29 listener_(listener),
30 ipc_task_runner_(ipc_task_runner),
31 channel_connected_called_(false),
32 message_filter_router_(new MessageFilterRouter()),
33 peer_pid_(base::kNullProcessId) {
34 DCHECK(ipc_task_runner_.get());
35 // The Listener thread where Messages are handled must be a separate thread
36 // to avoid oversubscribing the IO thread. If you trigger this error, you
37 // need to either:
38 // 1) Create the ChannelProxy on a different thread, or
39 // 2) Just use Channel
40 // Note, we currently make an exception for a NULL listener. That usage
41 // basically works, but is outside the intent of ChannelProxy. This support
42 // will disappear, so please don't rely on it. See crbug.com/364241
43 DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
46 ChannelProxy::Context::~Context() {
49 void ChannelProxy::Context::ClearIPCTaskRunner() {
50 ipc_task_runner_ = NULL;
53 void ChannelProxy::Context::SetListenerTaskRunner(
54 scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
55 DCHECK(ipc_task_runner_.get() != task_runner.get());
56 DCHECK(listener_task_runner_->BelongsToCurrentThread());
57 DCHECK(task_runner->BelongsToCurrentThread());
58 listener_task_runner_ = task_runner;
61 void ChannelProxy::Context::CreateChannel(scoped_ptr<ChannelFactory> factory) {
62 DCHECK(!channel_);
63 channel_id_ = factory->GetName();
64 channel_ = factory->BuildChannel(this);
67 bool ChannelProxy::Context::TryFilters(const Message& message) {
68 DCHECK(message_filter_router_);
69 #ifdef IPC_MESSAGE_LOG_ENABLED
70 Logging* logger = Logging::GetInstance();
71 if (logger->Enabled())
72 logger->OnPreDispatchMessage(message);
73 #endif
75 if (message_filter_router_->TryFilters(message)) {
76 if (message.dispatch_error()) {
77 listener_task_runner_->PostTask(
78 FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message));
80 #ifdef IPC_MESSAGE_LOG_ENABLED
81 if (logger->Enabled())
82 logger->OnPostDispatchMessage(message, channel_id_);
83 #endif
84 return true;
86 return false;
89 // Called on the IPC::Channel thread
90 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
91 // First give a chance to the filters to process this message.
92 if (!TryFilters(message))
93 OnMessageReceivedNoFilter(message);
94 return true;
97 // Called on the IPC::Channel thread
98 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
99 listener_task_runner_->PostTask(
100 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
101 return true;
104 // Called on the IPC::Channel thread
105 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
106 // We cache off the peer_pid so it can be safely accessed from both threads.
107 peer_pid_ = channel_->GetPeerPID();
109 // Add any pending filters. This avoids a race condition where someone
110 // creates a ChannelProxy, calls AddFilter, and then right after starts the
111 // peer process. The IO thread could receive a message before the task to add
112 // the filter is run on the IO thread.
113 OnAddFilter();
115 // See above comment about using listener_task_runner_ here.
116 listener_task_runner_->PostTask(
117 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
120 // Called on the IPC::Channel thread
121 void ChannelProxy::Context::OnChannelError() {
122 for (size_t i = 0; i < filters_.size(); ++i)
123 filters_[i]->OnChannelError();
125 // See above comment about using listener_task_runner_ here.
126 listener_task_runner_->PostTask(
127 FROM_HERE, base::Bind(&Context::OnDispatchError, this));
130 // Called on the IPC::Channel thread
131 void ChannelProxy::Context::OnChannelOpened() {
132 DCHECK(channel_ != NULL);
134 // Assume a reference to ourselves on behalf of this thread. This reference
135 // will be released when we are closed.
136 AddRef();
138 if (!channel_->Connect()) {
139 OnChannelError();
140 return;
143 for (size_t i = 0; i < filters_.size(); ++i)
144 filters_[i]->OnFilterAdded(channel_.get());
147 // Called on the IPC::Channel thread
148 void ChannelProxy::Context::OnChannelClosed() {
149 // It's okay for IPC::ChannelProxy::Close to be called more than once, which
150 // would result in this branch being taken.
151 if (!channel_)
152 return;
154 for (size_t i = 0; i < filters_.size(); ++i) {
155 filters_[i]->OnChannelClosing();
156 filters_[i]->OnFilterRemoved();
159 // We don't need the filters anymore.
160 message_filter_router_->Clear();
161 filters_.clear();
162 // We don't need the lock, because at this point, the listener thread can't
163 // access it any more.
164 pending_filters_.clear();
166 channel_.reset();
168 // Balance with the reference taken during startup. This may result in
169 // self-destruction.
170 Release();
173 void ChannelProxy::Context::Clear() {
174 listener_ = NULL;
177 // Called on the IPC::Channel thread
178 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
179 if (!channel_) {
180 OnChannelClosed();
181 return;
184 if (!channel_->Send(message.release()))
185 OnChannelError();
188 // Called on the IPC::Channel thread
189 void ChannelProxy::Context::OnAddFilter() {
190 // Our OnChannelConnected method has not yet been called, so we can't be
191 // sure that channel_ is valid yet. When OnChannelConnected *is* called,
192 // it invokes OnAddFilter, so any pending filter(s) will be added at that
193 // time.
194 if (peer_pid_ == base::kNullProcessId)
195 return;
197 std::vector<scoped_refptr<MessageFilter> > new_filters;
199 base::AutoLock auto_lock(pending_filters_lock_);
200 new_filters.swap(pending_filters_);
203 for (size_t i = 0; i < new_filters.size(); ++i) {
204 filters_.push_back(new_filters[i]);
206 message_filter_router_->AddFilter(new_filters[i].get());
208 // The channel has already been created and connected, so we need to
209 // inform the filters right now.
210 new_filters[i]->OnFilterAdded(channel_.get());
211 new_filters[i]->OnChannelConnected(peer_pid_);
215 // Called on the IPC::Channel thread
216 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
217 if (peer_pid_ == base::kNullProcessId) {
218 // The channel is not yet connected, so any filters are still pending.
219 base::AutoLock auto_lock(pending_filters_lock_);
220 for (size_t i = 0; i < pending_filters_.size(); ++i) {
221 if (pending_filters_[i].get() == filter) {
222 filter->OnFilterRemoved();
223 pending_filters_.erase(pending_filters_.begin() + i);
224 return;
227 return;
229 if (!channel_)
230 return; // The filters have already been deleted.
232 message_filter_router_->RemoveFilter(filter);
234 for (size_t i = 0; i < filters_.size(); ++i) {
235 if (filters_[i].get() == filter) {
236 filter->OnFilterRemoved();
237 filters_.erase(filters_.begin() + i);
238 return;
242 NOTREACHED() << "filter to be removed not found";
245 // Called on the listener's thread
246 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
247 base::AutoLock auto_lock(pending_filters_lock_);
248 pending_filters_.push_back(make_scoped_refptr(filter));
249 ipc_task_runner_->PostTask(
250 FROM_HERE, base::Bind(&Context::OnAddFilter, this));
253 // Called on the listener's thread
254 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
255 #ifdef IPC_MESSAGE_LOG_ENABLED
256 Logging* logger = Logging::GetInstance();
257 std::string name;
258 logger->GetMessageText(message.type(), &name, &message, NULL);
259 TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
260 "name", name);
261 #else
262 TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
263 "class", IPC_MESSAGE_ID_CLASS(message.type()),
264 "line", IPC_MESSAGE_ID_LINE(message.type()));
265 #endif
267 if (!listener_)
268 return;
270 OnDispatchConnected();
272 #ifdef IPC_MESSAGE_LOG_ENABLED
273 if (message.type() == IPC_LOGGING_ID) {
274 logger->OnReceivedLoggingMessage(message);
275 return;
278 if (logger->Enabled())
279 logger->OnPreDispatchMessage(message);
280 #endif
282 listener_->OnMessageReceived(message);
283 if (message.dispatch_error())
284 listener_->OnBadMessageReceived(message);
286 #ifdef IPC_MESSAGE_LOG_ENABLED
287 if (logger->Enabled())
288 logger->OnPostDispatchMessage(message, channel_id_);
289 #endif
292 // Called on the listener's thread
293 void ChannelProxy::Context::OnDispatchConnected() {
294 if (channel_connected_called_)
295 return;
297 channel_connected_called_ = true;
298 if (listener_)
299 listener_->OnChannelConnected(peer_pid_);
302 // Called on the listener's thread
303 void ChannelProxy::Context::OnDispatchError() {
304 if (listener_)
305 listener_->OnChannelError();
308 // Called on the listener's thread
309 void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) {
310 if (listener_)
311 listener_->OnBadMessageReceived(message);
314 //-----------------------------------------------------------------------------
316 // static
317 scoped_ptr<ChannelProxy> ChannelProxy::Create(
318 const IPC::ChannelHandle& channel_handle,
319 Channel::Mode mode,
320 Listener* listener,
321 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
322 scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
323 channel->Init(channel_handle, mode, true);
324 return channel.Pass();
327 // static
328 scoped_ptr<ChannelProxy> ChannelProxy::Create(
329 scoped_ptr<ChannelFactory> factory,
330 Listener* listener,
331 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
332 scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
333 channel->Init(factory.Pass(), true);
334 return channel.Pass();
337 ChannelProxy::ChannelProxy(Context* context)
338 : context_(context),
339 did_init_(false) {
342 ChannelProxy::ChannelProxy(
343 Listener* listener,
344 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
345 : context_(new Context(listener, ipc_task_runner)), did_init_(false) {
348 ChannelProxy::~ChannelProxy() {
349 DCHECK(CalledOnValidThread());
351 Close();
354 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
355 Channel::Mode mode,
356 bool create_pipe_now) {
357 #if defined(OS_POSIX)
358 // When we are creating a server on POSIX, we need its file descriptor
359 // to be created immediately so that it can be accessed and passed
360 // to other processes. Forcing it to be created immediately avoids
361 // race conditions that may otherwise arise.
362 if (mode & Channel::MODE_SERVER_FLAG) {
363 create_pipe_now = true;
365 #endif // defined(OS_POSIX)
366 Init(ChannelFactory::Create(channel_handle, mode),
367 create_pipe_now);
370 void ChannelProxy::Init(scoped_ptr<ChannelFactory> factory,
371 bool create_pipe_now) {
372 DCHECK(CalledOnValidThread());
373 DCHECK(!did_init_);
375 if (create_pipe_now) {
376 // Create the channel immediately. This effectively sets up the
377 // low-level pipe so that the client can connect. Without creating
378 // the pipe immediately, it is possible for a listener to attempt
379 // to connect and get an error since the pipe doesn't exist yet.
380 context_->CreateChannel(factory.Pass());
381 } else {
382 context_->ipc_task_runner()->PostTask(
383 FROM_HERE, base::Bind(&Context::CreateChannel,
384 context_.get(), Passed(factory.Pass())));
387 // complete initialization on the background thread
388 context_->ipc_task_runner()->PostTask(
389 FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
391 did_init_ = true;
394 void ChannelProxy::Close() {
395 DCHECK(CalledOnValidThread());
397 // Clear the backpointer to the listener so that any pending calls to
398 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is
399 // possible that the channel could be closed while it is receiving messages!
400 context_->Clear();
402 if (context_->ipc_task_runner()) {
403 context_->ipc_task_runner()->PostTask(
404 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
408 bool ChannelProxy::Send(Message* message) {
409 DCHECK(did_init_);
411 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
412 // tests that call Send() from a wrong thread. See http://crbug.com/163523.
414 #ifdef IPC_MESSAGE_LOG_ENABLED
415 Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
416 #endif
418 context_->ipc_task_runner()->PostTask(
419 FROM_HERE,
420 base::Bind(&ChannelProxy::Context::OnSendMessage,
421 context_, base::Passed(scoped_ptr<Message>(message))));
422 return true;
425 void ChannelProxy::AddFilter(MessageFilter* filter) {
426 DCHECK(CalledOnValidThread());
428 context_->AddFilter(filter);
431 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
432 DCHECK(CalledOnValidThread());
434 context_->ipc_task_runner()->PostTask(
435 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
436 make_scoped_refptr(filter)));
439 void ChannelProxy::SetListenerTaskRunner(
440 scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
441 DCHECK(CalledOnValidThread());
443 context()->SetListenerTaskRunner(task_runner);
446 void ChannelProxy::ClearIPCTaskRunner() {
447 DCHECK(CalledOnValidThread());
449 context()->ClearIPCTaskRunner();
452 #if defined(OS_POSIX) && !defined(OS_NACL_SFI)
453 // See the TODO regarding lazy initialization of the channel in
454 // ChannelProxy::Init().
455 int ChannelProxy::GetClientFileDescriptor() {
456 DCHECK(CalledOnValidThread());
458 Channel* channel = context_.get()->channel_.get();
459 // Channel must have been created first.
460 DCHECK(channel) << context_.get()->channel_id_;
461 return channel->GetClientFileDescriptor();
464 base::ScopedFD ChannelProxy::TakeClientFileDescriptor() {
465 DCHECK(CalledOnValidThread());
467 Channel* channel = context_.get()->channel_.get();
468 // Channel must have been created first.
469 DCHECK(channel) << context_.get()->channel_id_;
470 return channel->TakeClientFileDescriptor();
472 #endif
474 //-----------------------------------------------------------------------------
476 } // namespace IPC