1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/ipc_sync_channel.h"
8 #include "base/lazy_instance.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/synchronization/waitable_event.h"
12 #include "base/synchronization/waitable_event_watcher.h"
13 #include "base/thread_task_runner_handle.h"
14 #include "base/threading/thread_local.h"
15 #include "base/trace_event/trace_event.h"
16 #include "ipc/ipc_channel_factory.h"
17 #include "ipc/ipc_logging.h"
18 #include "ipc/ipc_message_macros.h"
19 #include "ipc/ipc_sync_message.h"
21 using base::TimeDelta
;
22 using base::TimeTicks
;
23 using base::WaitableEvent
;
26 // When we're blocked in a Send(), we need to process incoming synchronous
27 // messages right away because it could be blocking our reply (either
28 // directly from the same object we're calling, or indirectly through one or
29 // more other channels). That means that in SyncContext's OnMessageReceived,
30 // we need to process sync message right away if we're blocked. However a
31 // simple check isn't sufficient, because the listener thread can be in the
32 // process of calling Send.
33 // To work around this, when SyncChannel filters a sync message, it sets
34 // an event that the listener thread waits on during its Send() call. This
35 // allows us to dispatch incoming sync messages when blocked. The race
36 // condition is handled because if Send is in the process of being called, it
37 // will check the event. In case the listener thread isn't sending a message,
38 // we queue a task on the listener thread to dispatch the received messages.
39 // The messages are stored in this queue object that's shared among all
40 // SyncChannel objects on the same thread (since one object can receive a
41 // sync message while another one is blocked).
43 class SyncChannel::ReceivedSyncMsgQueue
:
44 public base::RefCountedThreadSafe
<ReceivedSyncMsgQueue
> {
46 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
47 // if necessary. Call RemoveContext on the same thread when done.
48 static ReceivedSyncMsgQueue
* AddContext() {
49 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
50 // SyncChannel objects can block the same thread).
51 ReceivedSyncMsgQueue
* rv
= lazy_tls_ptr_
.Pointer()->Get();
53 rv
= new ReceivedSyncMsgQueue();
54 ReceivedSyncMsgQueue::lazy_tls_ptr_
.Pointer()->Set(rv
);
56 rv
->listener_count_
++;
60 // Called on IPC thread when a synchronous message or reply arrives.
61 void QueueMessage(const Message
& msg
, SyncChannel::SyncContext
* context
) {
62 bool was_task_pending
;
64 base::AutoLock
auto_lock(message_lock_
);
66 was_task_pending
= task_pending_
;
69 // We set the event in case the listener thread is blocked (or is about
70 // to). In case it's not, the PostTask dispatches the messages.
71 message_queue_
.push_back(QueuedMessage(new Message(msg
), context
));
72 message_queue_version_
++;
75 dispatch_event_
.Signal();
76 if (!was_task_pending
) {
77 listener_task_runner_
->PostTask(
78 FROM_HERE
, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask
,
79 this, scoped_refptr
<SyncContext
>(context
)));
83 void QueueReply(const Message
&msg
, SyncChannel::SyncContext
* context
) {
84 received_replies_
.push_back(QueuedMessage(new Message(msg
), context
));
87 // Called on the listener's thread to process any queues synchronous
89 void DispatchMessagesTask(SyncContext
* context
) {
91 base::AutoLock
auto_lock(message_lock_
);
92 task_pending_
= false;
94 context
->DispatchMessages();
97 void DispatchMessages(SyncContext
* dispatching_context
) {
98 bool first_time
= true;
99 uint32_t expected_version
= 0;
100 SyncMessageQueue::iterator it
;
102 Message
* message
= NULL
;
103 scoped_refptr
<SyncChannel::SyncContext
> context
;
105 base::AutoLock
auto_lock(message_lock_
);
106 if (first_time
|| message_queue_version_
!= expected_version
) {
107 it
= message_queue_
.begin();
110 for (; it
!= message_queue_
.end(); it
++) {
111 int message_group
= it
->context
->restrict_dispatch_group();
112 if (message_group
== kRestrictDispatchGroup_None
||
113 message_group
== dispatching_context
->restrict_dispatch_group()) {
114 message
= it
->message
;
115 context
= it
->context
;
116 it
= message_queue_
.erase(it
);
117 message_queue_version_
++;
118 expected_version
= message_queue_version_
;
126 context
->OnDispatchMessage(*message
);
131 // SyncChannel calls this in its destructor.
132 void RemoveContext(SyncContext
* context
) {
133 base::AutoLock
auto_lock(message_lock_
);
135 SyncMessageQueue::iterator iter
= message_queue_
.begin();
136 while (iter
!= message_queue_
.end()) {
137 if (iter
->context
.get() == context
) {
138 delete iter
->message
;
139 iter
= message_queue_
.erase(iter
);
140 message_queue_version_
++;
146 if (--listener_count_
== 0) {
147 DCHECK(lazy_tls_ptr_
.Pointer()->Get());
148 lazy_tls_ptr_
.Pointer()->Set(NULL
);
152 WaitableEvent
* dispatch_event() { return &dispatch_event_
; }
153 base::SingleThreadTaskRunner
* listener_task_runner() {
154 return listener_task_runner_
.get();
157 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
158 static base::LazyInstance
<base::ThreadLocalPointer
<ReceivedSyncMsgQueue
> >
161 // Called on the ipc thread to check if we can unblock any current Send()
162 // calls based on a queued reply.
163 void DispatchReplies() {
164 for (size_t i
= 0; i
< received_replies_
.size(); ++i
) {
165 Message
* message
= received_replies_
[i
].message
;
166 if (received_replies_
[i
].context
->TryToUnblockListener(message
)) {
168 received_replies_
.erase(received_replies_
.begin() + i
);
174 base::WaitableEventWatcher
* top_send_done_watcher() {
175 return top_send_done_watcher_
;
178 void set_top_send_done_watcher(base::WaitableEventWatcher
* watcher
) {
179 top_send_done_watcher_
= watcher
;
183 friend class base::RefCountedThreadSafe
<ReceivedSyncMsgQueue
>;
185 // See the comment in SyncChannel::SyncChannel for why this event is created
187 ReceivedSyncMsgQueue() :
188 message_queue_version_(0),
189 dispatch_event_(true, false),
190 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
191 task_pending_(false),
193 top_send_done_watcher_(NULL
) {
196 ~ReceivedSyncMsgQueue() {}
198 // Holds information about a queued synchronous message or reply.
199 struct QueuedMessage
{
200 QueuedMessage(Message
* m
, SyncContext
* c
) : message(m
), context(c
) { }
202 scoped_refptr
<SyncChannel::SyncContext
> context
;
205 typedef std::list
<QueuedMessage
> SyncMessageQueue
;
206 SyncMessageQueue message_queue_
;
207 uint32_t message_queue_version_
; // Used to signal DispatchMessages to rescan
209 std::vector
<QueuedMessage
> received_replies_
;
211 // Set when we got a synchronous message that we must respond to as the
212 // sender needs its reply before it can reply to our original synchronous
214 WaitableEvent dispatch_event_
;
215 scoped_refptr
<base::SingleThreadTaskRunner
> listener_task_runner_
;
216 base::Lock message_lock_
;
220 // The current send done event watcher for this thread. Used to maintain
221 // a local global stack of send done watchers to ensure that nested sync
222 // message loops complete correctly.
223 base::WaitableEventWatcher
* top_send_done_watcher_
;
226 base::LazyInstance
<base::ThreadLocalPointer
<SyncChannel::ReceivedSyncMsgQueue
> >
227 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_
=
228 LAZY_INSTANCE_INITIALIZER
;
230 SyncChannel::SyncContext::SyncContext(
232 const scoped_refptr
<base::SingleThreadTaskRunner
>& ipc_task_runner
,
233 WaitableEvent
* shutdown_event
)
234 : ChannelProxy::Context(listener
, ipc_task_runner
),
235 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
236 shutdown_event_(shutdown_event
),
237 restrict_dispatch_group_(kRestrictDispatchGroup_None
) {
240 SyncChannel::SyncContext::~SyncContext() {
241 while (!deserializers_
.empty())
245 // Adds information about an outgoing sync message to the context so that
246 // we know how to deserialize the reply. Returns a handle that's set when
247 // the reply has arrived.
248 void SyncChannel::SyncContext::Push(SyncMessage
* sync_msg
) {
249 // Create the tracking information for this message. This object is stored
250 // by value since all members are pointers that are cheap to copy. These
251 // pointers are cleaned up in the Pop() function.
253 // The event is created as manual reset because in between Signal and
254 // OnObjectSignalled, another Send can happen which would stop the watcher
255 // from being called. The event would get watched later, when the nested
256 // Send completes, so the event will need to remain set.
257 PendingSyncMsg
pending(SyncMessage::GetMessageId(*sync_msg
),
258 sync_msg
->GetReplyDeserializer(),
259 new WaitableEvent(true, false));
260 base::AutoLock
auto_lock(deserializers_lock_
);
261 deserializers_
.push_back(pending
);
264 bool SyncChannel::SyncContext::Pop() {
267 base::AutoLock
auto_lock(deserializers_lock_
);
268 PendingSyncMsg msg
= deserializers_
.back();
269 delete msg
.deserializer
;
270 delete msg
.done_event
;
271 msg
.done_event
= NULL
;
272 deserializers_
.pop_back();
273 result
= msg
.send_result
;
276 // We got a reply to a synchronous Send() call that's blocking the listener
277 // thread. However, further down the call stack there could be another
278 // blocking Send() call, whose reply we received after we made this last
279 // Send() call. So check if we have any queued replies available that
280 // can now unblock the listener thread.
281 ipc_task_runner()->PostTask(
282 FROM_HERE
, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies
,
283 received_sync_msgs_
.get()));
288 WaitableEvent
* SyncChannel::SyncContext::GetSendDoneEvent() {
289 base::AutoLock
auto_lock(deserializers_lock_
);
290 return deserializers_
.back().done_event
;
293 WaitableEvent
* SyncChannel::SyncContext::GetDispatchEvent() {
294 return received_sync_msgs_
->dispatch_event();
297 void SyncChannel::SyncContext::DispatchMessages() {
298 received_sync_msgs_
->DispatchMessages(this);
301 bool SyncChannel::SyncContext::TryToUnblockListener(const Message
* msg
) {
302 base::AutoLock
auto_lock(deserializers_lock_
);
303 if (deserializers_
.empty() ||
304 !SyncMessage::IsMessageReplyTo(*msg
, deserializers_
.back().id
)) {
308 // TODO(bauerb): Remove logging once investigation of http://crbug.com/141055
310 if (!msg
->is_reply_error()) {
311 bool send_result
= deserializers_
.back().deserializer
->
312 SerializeOutputParameters(*msg
);
313 deserializers_
.back().send_result
= send_result
;
314 VLOG_IF(1, !send_result
) << "Couldn't deserialize reply message";
316 VLOG(1) << "Received error reply";
318 deserializers_
.back().done_event
->Signal();
323 void SyncChannel::SyncContext::Clear() {
324 CancelPendingSends();
325 received_sync_msgs_
->RemoveContext(this);
329 bool SyncChannel::SyncContext::OnMessageReceived(const Message
& msg
) {
330 // Give the filters a chance at processing this message.
334 if (TryToUnblockListener(&msg
))
337 if (msg
.is_reply()) {
338 received_sync_msgs_
->QueueReply(msg
, this);
342 if (msg
.should_unblock()) {
343 received_sync_msgs_
->QueueMessage(msg
, this);
347 return Context::OnMessageReceivedNoFilter(msg
);
350 void SyncChannel::SyncContext::OnChannelError() {
351 CancelPendingSends();
352 shutdown_watcher_
.StopWatching();
353 Context::OnChannelError();
356 void SyncChannel::SyncContext::OnChannelOpened() {
357 shutdown_watcher_
.StartWatching(
359 base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled
,
360 base::Unretained(this)));
361 Context::OnChannelOpened();
364 void SyncChannel::SyncContext::OnChannelClosed() {
365 CancelPendingSends();
366 shutdown_watcher_
.StopWatching();
367 Context::OnChannelClosed();
370 void SyncChannel::SyncContext::OnSendTimeout(int message_id
) {
371 base::AutoLock
auto_lock(deserializers_lock_
);
372 PendingSyncMessageQueue::iterator iter
;
373 VLOG(1) << "Send timeout";
374 for (iter
= deserializers_
.begin(); iter
!= deserializers_
.end(); iter
++) {
375 if (iter
->id
== message_id
) {
376 iter
->done_event
->Signal();
382 void SyncChannel::SyncContext::CancelPendingSends() {
383 base::AutoLock
auto_lock(deserializers_lock_
);
384 PendingSyncMessageQueue::iterator iter
;
385 // TODO(bauerb): Remove once http://crbug/141055 is fixed.
386 VLOG(1) << "Canceling pending sends";
387 for (iter
= deserializers_
.begin(); iter
!= deserializers_
.end(); iter
++)
388 iter
->done_event
->Signal();
391 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent
* event
) {
392 if (event
== shutdown_event_
) {
393 // Process shut down before we can get a reply to a synchronous message.
394 // Cancel pending Send calls, which will end up setting the send done event.
395 CancelPendingSends();
397 // We got the reply, timed out or the process shutdown.
398 DCHECK_EQ(GetSendDoneEvent(), event
);
399 base::MessageLoop::current()->QuitNow();
403 base::WaitableEventWatcher::EventCallback
404 SyncChannel::SyncContext::MakeWaitableEventCallback() {
405 return base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled
, this);
409 scoped_ptr
<SyncChannel
> SyncChannel::Create(
410 const IPC::ChannelHandle
& channel_handle
,
413 const scoped_refptr
<base::SingleThreadTaskRunner
>& ipc_task_runner
,
414 bool create_pipe_now
,
415 base::WaitableEvent
* shutdown_event
,
416 AttachmentBroker
* broker
) {
417 scoped_ptr
<SyncChannel
> channel
=
418 Create(listener
, ipc_task_runner
, shutdown_event
);
419 channel
->Init(channel_handle
, mode
, create_pipe_now
, broker
);
420 return channel
.Pass();
424 scoped_ptr
<SyncChannel
> SyncChannel::Create(
425 scoped_ptr
<ChannelFactory
> factory
,
427 const scoped_refptr
<base::SingleThreadTaskRunner
>& ipc_task_runner
,
428 bool create_pipe_now
,
429 base::WaitableEvent
* shutdown_event
) {
430 scoped_ptr
<SyncChannel
> channel
=
431 Create(listener
, ipc_task_runner
, shutdown_event
);
432 channel
->Init(factory
.Pass(), create_pipe_now
);
433 return channel
.Pass();
437 scoped_ptr
<SyncChannel
> SyncChannel::Create(
439 const scoped_refptr
<base::SingleThreadTaskRunner
>& ipc_task_runner
,
440 WaitableEvent
* shutdown_event
) {
441 return make_scoped_ptr(
442 new SyncChannel(listener
, ipc_task_runner
, shutdown_event
));
445 SyncChannel::SyncChannel(
447 const scoped_refptr
<base::SingleThreadTaskRunner
>& ipc_task_runner
,
448 WaitableEvent
* shutdown_event
)
449 : ChannelProxy(new SyncContext(listener
, ipc_task_runner
, shutdown_event
)) {
450 // The current (listener) thread must be distinct from the IPC thread, or else
451 // sending synchronous messages will deadlock.
452 DCHECK_NE(ipc_task_runner
.get(), base::ThreadTaskRunnerHandle::Get().get());
456 SyncChannel::~SyncChannel() {
459 void SyncChannel::SetRestrictDispatchChannelGroup(int group
) {
460 sync_context()->set_restrict_dispatch_group(group
);
463 scoped_refptr
<SyncMessageFilter
> SyncChannel::CreateSyncMessageFilter() {
464 scoped_refptr
<SyncMessageFilter
> filter
= new SyncMessageFilter(
465 sync_context()->shutdown_event(),
466 sync_context()->IsChannelSendThreadSafe());
467 AddFilter(filter
.get());
469 pre_init_sync_message_filters_
.push_back(filter
);
473 bool SyncChannel::Send(Message
* message
) {
474 #ifdef IPC_MESSAGE_LOG_ENABLED
476 Logging::GetInstance()->GetMessageText(message
->type(), &name
, message
, NULL
);
477 TRACE_EVENT1("ipc", "SyncChannel::Send", "name", name
);
479 TRACE_EVENT2("ipc", "SyncChannel::Send",
480 "class", IPC_MESSAGE_ID_CLASS(message
->type()),
481 "line", IPC_MESSAGE_ID_LINE(message
->type()));
483 if (!message
->is_sync()) {
484 ChannelProxy::Send(message
);
488 // *this* might get deleted in WaitForReply.
489 scoped_refptr
<SyncContext
> context(sync_context());
490 if (context
->shutdown_event()->IsSignaled()) {
491 VLOG(1) << "shutdown event is signaled";
496 SyncMessage
* sync_msg
= static_cast<SyncMessage
*>(message
);
497 context
->Push(sync_msg
);
498 WaitableEvent
* pump_messages_event
= sync_msg
->pump_messages_event();
500 ChannelProxy::Send(message
);
502 // Wait for reply, or for any other incoming synchronous messages.
503 // *this* might get deleted, so only call static functions at this point.
504 WaitForReply(context
.get(), pump_messages_event
);
506 return context
->Pop();
509 void SyncChannel::WaitForReply(
510 SyncContext
* context
, WaitableEvent
* pump_messages_event
) {
511 context
->DispatchMessages();
513 WaitableEvent
* objects
[] = {
514 context
->GetDispatchEvent(),
515 context
->GetSendDoneEvent(),
519 unsigned count
= pump_messages_event
? 3: 2;
520 size_t result
= WaitableEvent::WaitMany(objects
, count
);
521 if (result
== 0 /* dispatch event */) {
522 // We're waiting for a reply, but we received a blocking synchronous
523 // call. We must process it or otherwise a deadlock might occur.
524 context
->GetDispatchEvent()->Reset();
525 context
->DispatchMessages();
529 if (result
== 2 /* pump_messages_event */)
530 WaitForReplyWithNestedMessageLoop(context
); // Run a nested message loop.
536 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext
* context
) {
537 base::WaitableEventWatcher send_done_watcher
;
539 ReceivedSyncMsgQueue
* sync_msg_queue
= context
->received_sync_msgs();
540 DCHECK(sync_msg_queue
!= NULL
);
542 base::WaitableEventWatcher
* old_send_done_event_watcher
=
543 sync_msg_queue
->top_send_done_watcher();
545 base::WaitableEventWatcher::EventCallback old_callback
;
546 base::WaitableEvent
* old_event
= NULL
;
548 // Maintain a local global stack of send done delegates to ensure that
549 // nested sync calls complete in the correct sequence, i.e. the
550 // outermost call completes first, etc.
551 if (old_send_done_event_watcher
) {
552 old_callback
= old_send_done_event_watcher
->callback();
553 old_event
= old_send_done_event_watcher
->GetWatchedEvent();
554 old_send_done_event_watcher
->StopWatching();
557 sync_msg_queue
->set_top_send_done_watcher(&send_done_watcher
);
559 send_done_watcher
.StartWatching(context
->GetSendDoneEvent(),
560 context
->MakeWaitableEventCallback());
563 base::MessageLoop::ScopedNestableTaskAllower
allow(
564 base::MessageLoop::current());
565 base::MessageLoop::current()->Run();
568 sync_msg_queue
->set_top_send_done_watcher(old_send_done_event_watcher
);
569 if (old_send_done_event_watcher
&& old_event
) {
570 old_send_done_event_watcher
->StartWatching(old_event
, old_callback
);
574 void SyncChannel::OnWaitableEventSignaled(WaitableEvent
* event
) {
575 DCHECK(event
== sync_context()->GetDispatchEvent());
576 // The call to DispatchMessages might delete this object, so reregister
577 // the object watcher first.
579 dispatch_watcher_
.StartWatching(event
, dispatch_watcher_callback_
);
580 sync_context()->DispatchMessages();
583 void SyncChannel::StartWatching() {
584 // Ideally we only want to watch this object when running a nested message
585 // loop. However, we don't know when it exits if there's another nested
586 // message loop running under it or not, so we wouldn't know whether to
587 // stop or keep watching. So we always watch it, and create the event as
588 // manual reset since the object watcher might otherwise reset the event
589 // when we're doing a WaitMany.
590 dispatch_watcher_callback_
=
591 base::Bind(&SyncChannel::OnWaitableEventSignaled
,
592 base::Unretained(this));
593 dispatch_watcher_
.StartWatching(sync_context()->GetDispatchEvent(),
594 dispatch_watcher_callback_
);
597 void SyncChannel::OnChannelInit() {
598 for (const auto& filter
: pre_init_sync_message_filters_
) {
599 filter
->set_is_channel_send_thread_safe(
600 context()->IsChannelSendThreadSafe());
602 pre_init_sync_message_filters_
.clear();