1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
4 /* This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
8 #ifndef ipc_glue_MessageChannel_h
9 #define ipc_glue_MessageChannel_h 1
11 #include "base/basictypes.h"
12 #include "base/message_loop.h"
14 #include "mozilla/DebugOnly.h"
15 #include "mozilla/Monitor.h"
16 #include "mozilla/Vector.h"
17 #include "mozilla/WeakPtr.h"
18 #include "mozilla/ipc/Transport.h"
19 #include "MessageLink.h"
20 #include "nsAutoPtr.h"
31 class RefCountedMonitor
: public Monitor
35 : Monitor("mozilla.ipc.MessageChannel.mMonitor")
38 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(RefCountedMonitor
)
41 ~RefCountedMonitor() {}
44 class MessageChannel
: HasResultCodes
46 friend class ProcessLink
;
47 friend class ThreadLink
;
52 typedef mozilla::Monitor Monitor
;
55 static const int32_t kNoTimeout
;
57 typedef IPC::Message Message
;
58 typedef mozilla::ipc::Transport Transport
;
60 explicit MessageChannel(MessageListener
*aListener
);
63 // "Open" from the perspective of the transport layer; the underlying
64 // socketpair/pipe should already be created.
66 // Returns true if the transport layer was successfully connected,
67 // i.e., mChannelState == ChannelConnected.
68 bool Open(Transport
* aTransport
, MessageLoop
* aIOLoop
=0, Side aSide
=UnknownSide
);
70 // "Open" a connection to another thread in the same process.
72 // Returns true if the transport layer was successfully connected,
73 // i.e., mChannelState == ChannelConnected.
75 // For more details on the process of opening a channel between
76 // threads, see the extended comment on this function
77 // in MessageChannel.cpp.
78 bool Open(MessageChannel
*aTargetChan
, MessageLoop
*aTargetLoop
, Side aSide
);
80 // Close the underlying transport channel.
83 // Force the channel to behave as if a channel error occurred. Valid
84 // for process links only, not thread links.
85 void CloseWithError();
87 void CloseWithTimeout();
89 void SetAbortOnError(bool abort
)
94 // Misc. behavioral traits consumers can request for this channel
97 // Windows: if this channel operates on the UI thread, indicates
98 // WindowsMessageLoop code should enable deferred native message
99 // handling to prevent deadlocks. Should only be used for protocols
100 // that manage child processes which might create native UI, like
102 REQUIRE_DEFERRED_MESSAGE_PROTECTION
= 1 << 0
104 void SetChannelFlags(ChannelFlags aFlags
) { mFlags
= aFlags
; }
105 ChannelFlags
GetChannelFlags() { return mFlags
; }
109 bool ShouldBlockScripts() const
111 return mBlockScripts
;
114 // Asynchronously send a message to the other side of the channel
115 bool Send(Message
* aMsg
);
117 // Asynchronously deliver a message back to this side of the
119 bool Echo(Message
* aMsg
);
121 // Synchronously send |msg| (i.e., wait for |reply|)
122 bool Send(Message
* aMsg
, Message
* aReply
);
124 // Make an Interrupt call to the other side of the channel
125 bool Call(Message
* aMsg
, Message
* aReply
);
127 // Wait until a message is received
128 bool WaitForIncomingMessage();
130 bool CanSend() const;
132 void SetReplyTimeoutMs(int32_t aTimeoutMs
);
134 bool IsOnCxxStack() const {
135 return !mCxxStackFrames
.empty();
138 void FlushPendingInterruptQueue();
140 // Unsound_IsClosed and Unsound_NumQueuedMessages are safe to call from any
141 // thread, but they make no guarantees about whether you'll get an
142 // up-to-date value; the values are written on one thread and read without
143 // locking, on potentially different threads. Thus you should only use
144 // them when you don't particularly care about getting a recent value (e.g.
145 // in a memory report).
146 bool Unsound_IsClosed() const {
147 return mLink
? mLink
->Unsound_IsClosed() : true;
149 uint32_t Unsound_NumQueuedMessages() const {
150 return mLink
? mLink
->Unsound_NumQueuedMessages() : 0;
153 static bool IsPumpingMessages() {
154 return sIsPumpingMessages
;
156 static void SetIsPumpingMessages(bool aIsPumping
) {
157 sIsPumpingMessages
= aIsPumping
;
161 struct MOZ_STACK_CLASS SyncStackFrame
163 SyncStackFrame(MessageChannel
* channel
, bool interrupt
);
167 bool mSpinNestedEvents
;
168 bool mListenerNotified
;
169 MessageChannel
* mChannel
;
171 // The previous stack frame for this channel.
172 SyncStackFrame
* mPrev
;
174 // The previous stack frame on any channel.
175 SyncStackFrame
* mStaticPrev
;
177 friend struct MessageChannel::SyncStackFrame
;
179 static bool IsSpinLoopActive() {
180 for (SyncStackFrame
* frame
= sStaticTopFrame
; frame
; frame
= frame
->mPrev
) {
181 if (frame
->mSpinNestedEvents
)
188 // The deepest sync stack frame for this channel.
189 SyncStackFrame
* mTopFrame
;
191 bool mIsSyncWaitingOnNonMainThread
;
193 // The deepest sync stack frame on any channel.
194 static SyncStackFrame
* sStaticTopFrame
;
197 void ProcessNativeEventsInInterruptCall();
198 static void NotifyGeckoEventDispatch();
201 void SpinInternalEventLoop();
205 void CommonThreadOpenInit(MessageChannel
*aTargetChan
, Side aSide
);
206 void OnOpenAsSlave(MessageChannel
*aTargetChan
, Side aSide
);
208 void PostErrorNotifyTask();
209 void OnNotifyMaybeChannelError();
210 void ReportConnectionError(const char* aChannelName
) const;
211 void ReportMessageRouteError(const char* channelName
) const;
212 bool MaybeHandleError(Result code
, const Message
& aMsg
, const char* channelName
);
216 // Send OnChannelConnected notification to listeners.
217 void DispatchOnChannelConnected();
219 bool InterruptEventOccurred();
220 bool HasPendingEvents();
222 bool ProcessPendingRequest(const Message
&aUrgent
);
224 void MaybeUndeferIncall();
225 void EnqueuePendingMessages();
227 // Executed on the worker thread. Dequeues one pending message.
228 bool OnMaybeDequeueOne();
229 bool DequeueOne(Message
*recvd
);
231 // Dispatches an incoming message to its appropriate handler.
232 void DispatchMessage(const Message
&aMsg
);
234 // DispatchMessage will route to one of these functions depending on the
235 // protocol type of the message.
236 void DispatchSyncMessage(const Message
&aMsg
);
237 void DispatchUrgentMessage(const Message
&aMsg
);
238 void DispatchAsyncMessage(const Message
&aMsg
);
239 void DispatchRPCMessage(const Message
&aMsg
);
240 void DispatchInterruptMessage(const Message
&aMsg
, size_t aStackDepth
);
242 // Return true if the wait ended because a notification was received.
244 // Return false if the time elapsed from when we started the process of
245 // waiting until afterwards exceeded the currently allotted timeout.
246 // That *DOES NOT* mean false => "no event" (== timeout); there are many
247 // circumstances that could cause the measured elapsed time to exceed the
248 // timeout EVEN WHEN we were notified.
250 // So in sum: true is a meaningful return value; false isn't,
252 bool WaitForSyncNotify();
253 bool WaitForInterruptNotify();
255 bool WaitResponse(bool aWaitTimedOut
);
257 bool ShouldContinueFromTimeout();
259 // The "remote view of stack depth" can be different than the
260 // actual stack depth when there are out-of-turn replies. When we
261 // receive one, our actual Interrupt stack depth doesn't decrease, but
262 // the other side (that sent the reply) thinks it has. So, the
263 // "view" returned here is |stackDepth| minus the number of
264 // out-of-turn replies.
266 // Only called from the worker thread.
267 size_t RemoteViewOfStackDepth(size_t stackDepth
) const {
268 AssertWorkerThread();
269 return stackDepth
- mOutOfTurnReplies
.size();
272 int32_t NextSeqno() {
273 AssertWorkerThread();
274 return (mSide
== ChildSide
) ? --mNextSeqno
: ++mNextSeqno
;
277 // This helper class manages mCxxStackDepth on behalf of MessageChannel.
278 // When the stack depth is incremented from zero to non-zero, it invokes
279 // a callback, and similarly for when the depth goes from non-zero to zero.
280 void EnteredCxxStack() {
281 mListener
->OnEnteredCxxStack();
284 void ExitedCxxStack();
287 mListener
->OnEnteredCall();
291 mListener
->OnExitedCall();
294 MessageListener
*Listener() const {
295 return mListener
.get();
298 void DebugAbort(const char* file
, int line
, const char* cond
,
300 bool reply
=false) const;
302 // This method is only safe to call on the worker thread, or in a
303 // debugger with all threads paused.
304 void DumpInterruptStack(const char* const pfx
="") const;
307 // Called from both threads
308 size_t InterruptStackDepth() const {
309 mMonitor
->AssertCurrentThreadOwns();
310 return mInterruptStack
.size();
313 // Returns true if we're blocking waiting for a reply.
314 bool AwaitingSyncReply() const {
315 mMonitor
->AssertCurrentThreadOwns();
316 return mAwaitingSyncReply
;
318 int AwaitingSyncReplyPriority() const {
319 mMonitor
->AssertCurrentThreadOwns();
320 return mAwaitingSyncReplyPriority
;
322 bool AwaitingInterruptReply() const {
323 mMonitor
->AssertCurrentThreadOwns();
324 return !mInterruptStack
.empty();
326 bool AwaitingIncomingMessage() const {
327 mMonitor
->AssertCurrentThreadOwns();
328 return mIsWaitingForIncoming
;
331 class MOZ_STACK_CLASS AutoEnterWaitForIncoming
334 explicit AutoEnterWaitForIncoming(MessageChannel
& aChannel
)
337 aChannel
.mMonitor
->AssertCurrentThreadOwns();
338 aChannel
.mIsWaitingForIncoming
= true;
341 ~AutoEnterWaitForIncoming()
343 mChannel
.mIsWaitingForIncoming
= false;
347 MessageChannel
& mChannel
;
349 friend class AutoEnterWaitForIncoming
;
351 // Returns true if we're dispatching a sync message's callback.
352 bool DispatchingSyncMessage() const {
353 AssertWorkerThread();
354 return mDispatchingSyncMessage
;
357 int DispatchingSyncMessagePriority() const {
358 AssertWorkerThread();
359 return mDispatchingSyncMessagePriority
;
362 bool DispatchingAsyncMessage() const {
363 AssertWorkerThread();
364 return mDispatchingAsyncMessage
;
367 int DispatchingAsyncMessagePriority() const {
368 AssertWorkerThread();
369 return mDispatchingAsyncMessagePriority
;
372 bool Connected() const;
375 // Executed on the IO thread.
376 void NotifyWorkerThread();
378 // Return true if |aMsg| is a special message targeted at the IO
379 // thread, in which case it shouldn't be delivered to the worker.
380 bool MaybeInterceptSpecialIOMessage(const Message
& aMsg
);
382 void OnChannelConnected(int32_t peer_id
);
384 // Tell the IO thread to close the channel and wait for it to ACK.
385 void SynchronouslyClose();
387 bool ShouldDeferMessage(const Message
& aMsg
);
388 void OnMessageReceivedFromLink(const Message
& aMsg
);
389 void OnChannelErrorFromLink();
392 // Run on the not current thread.
393 void NotifyChannelClosed();
394 void NotifyMaybeChannelError();
397 // Can be run on either thread
398 void AssertWorkerThread() const
400 NS_ABORT_IF_FALSE(mWorkerLoopID
== MessageLoop::current()->id(),
401 "not on worker thread!");
404 // The "link" thread is either the I/O thread (ProcessLink) or the
405 // other actor's work thread (ThreadLink). In either case, it is
406 // NOT our worker thread.
407 void AssertLinkThread() const
409 NS_ABORT_IF_FALSE(mWorkerLoopID
!= MessageLoop::current()->id(),
410 "on worker thread but should not be!");
414 typedef IPC::Message::msgid_t msgid_t
;
415 typedef std::deque
<Message
> MessageQueue
;
416 typedef std::map
<size_t, Message
> MessageMap
;
418 // All dequeuing tasks require a single point of cancellation,
419 // which is handled via a reference-counted task.
423 explicit RefCountedTask(CancelableTask
* aTask
)
427 ~RefCountedTask() { delete mTask
; }
429 void Run() { mTask
->Run(); }
430 void Cancel() { mTask
->Cancel(); }
432 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(RefCountedTask
)
435 CancelableTask
* mTask
;
438 // Wrap an existing task which can be cancelled at any time
439 // without the wrapper's knowledge.
440 class DequeueTask
: public Task
443 explicit DequeueTask(RefCountedTask
* aTask
)
446 void Run() { mTask
->Run(); }
449 nsRefPtr
<RefCountedTask
> mTask
;
453 mozilla::WeakPtr
<MessageListener
> mListener
;
454 ChannelState mChannelState
;
455 nsRefPtr
<RefCountedMonitor
> mMonitor
;
458 MessageLoop
* mWorkerLoop
; // thread where work is done
459 CancelableTask
* mChannelErrorTask
; // NotifyMaybeChannelError runnable
461 // id() of mWorkerLoop. This persists even after mWorkerLoop is cleared
462 // during channel shutdown.
465 // A task encapsulating dequeuing one pending message.
466 nsRefPtr
<RefCountedTask
> mDequeueOneTask
;
468 // Timeout periods are broken up in two to prevent system suspension from
469 // triggering an abort. This method (called by WaitForEvent with a 'did
470 // timeout' flag) decides if we should wait again for half of mTimeoutMs
473 bool mInTimeoutSecondHalf
;
475 // Worker-thread only; sequence numbers for messages that require
476 // synchronous replies.
479 static bool sIsPumpingMessages
;
484 explicit AutoSetValue(T
&var
, const T
&newValue
)
485 : mVar(var
), mPrev(var
)
497 // Worker thread only.
498 bool mAwaitingSyncReply
;
499 int mAwaitingSyncReplyPriority
;
501 // Set while we are dispatching a synchronous message. Only for use on the
503 bool mDispatchingSyncMessage
;
504 int mDispatchingSyncMessagePriority
;
506 bool mDispatchingAsyncMessage
;
507 int mDispatchingAsyncMessagePriority
;
509 // When we send an urgent request from the parent process, we could race
510 // with an RPC message that was issued by the child beforehand. In this
511 // case, if the parent were to wake up while waiting for the urgent reply,
512 // and process the RPC, it could send an additional urgent message. The
513 // child would wake up to process the urgent message (as it always will),
514 // then send a reply, which could be received by the parent out-of-order
515 // with respect to the first urgent reply.
517 // To address this problem, urgent or RPC requests are associated with a
518 // "transaction". Whenever one side of the channel wishes to start a
519 // chain of RPC/urgent messages, it allocates a new transaction ID. Any
520 // messages the parent receives, not apart of this transaction, are
521 // deferred. When issuing RPC/urgent requests on top of a started
522 // transaction, the initiating transaction ID is used.
524 // To ensure IDs are unique, we use sequence numbers for transaction IDs,
525 // which grow in opposite directions from child to parent.
527 // The current transaction ID.
528 int32_t mCurrentTransaction
;
530 class AutoEnterTransaction
533 explicit AutoEnterTransaction(MessageChannel
*aChan
, int32_t aMsgSeqno
)
535 mOldTransaction(mChan
->mCurrentTransaction
)
537 mChan
->mMonitor
->AssertCurrentThreadOwns();
538 if (mChan
->mCurrentTransaction
== 0)
539 mChan
->mCurrentTransaction
= aMsgSeqno
;
541 explicit AutoEnterTransaction(MessageChannel
*aChan
, const Message
&aMessage
)
543 mOldTransaction(mChan
->mCurrentTransaction
)
545 mChan
->mMonitor
->AssertCurrentThreadOwns();
547 if (!aMessage
.is_sync())
550 MOZ_ASSERT_IF(mChan
->mSide
== ParentSide
&& mOldTransaction
!= aMessage
.transaction_id(),
551 !mOldTransaction
|| aMessage
.priority() > mChan
->AwaitingSyncReplyPriority());
552 mChan
->mCurrentTransaction
= aMessage
.transaction_id();
554 ~AutoEnterTransaction() {
555 mChan
->mMonitor
->AssertCurrentThreadOwns();
556 mChan
->mCurrentTransaction
= mOldTransaction
;
560 MessageChannel
*mChan
;
561 int32_t mOldTransaction
;
564 // If a sync message times out, we store its sequence number here. Any
565 // future sync messages will fail immediately. Once the reply for original
566 // sync message is received, we allow sync messages again.
568 // When a message times out, nothing is done to inform the other side. The
569 // other side will eventually dispatch the message and send a reply. Our
570 // side is responsible for replying to all sync messages sent by the other
571 // side when it dispatches the timed out message. The response is always an
574 // A message is only timed out if it initiated a transaction. This avoids
575 // hitting a lot of corner cases with message nesting that we don't really
577 int32_t mTimedOutMessageSeqno
;
579 // If waiting for the reply to a sync out-message, it will be saved here
580 // on the I/O thread and then read and cleared by the worker thread.
581 nsAutoPtr
<Message
> mRecvd
;
583 // If a sync message reply that is an error arrives, we increment this
584 // counter rather than storing it in mRecvd.
587 // Queue of all incoming messages, except for replies to sync and urgent
588 // messages, which are delivered directly to mRecvd, and any pending urgent
589 // incall, which is stored in mPendingUrgentRequest.
591 // If both this side and the other side are functioning correctly, the queue
592 // can only be in certain configurations. Let
594 // |A<| be an async in-message,
595 // |S<| be a sync in-message,
596 // |C<| be an Interrupt in-call,
597 // |R<| be an Interrupt reply.
599 // The queue can only match this configuration
601 // A<* (S< | C< | R< (?{mStack.size() == 1} A<* (S< | C<)))
603 // The other side can send as many async messages |A<*| as it wants before
604 // sending us a blocking message.
606 // The first case is |S<|, a sync in-msg. The other side must be blocked,
607 // and thus can't send us any more messages until we process the sync
610 // The second case is |C<|, an Interrupt in-call; the other side must be blocked.
611 // (There's a subtlety here: this in-call might have raced with an
612 // out-call, but we detect that with the mechanism below,
613 // |mRemoteStackDepth|, and races don't matter to the queue.)
615 // Final case, the other side replied to our most recent out-call |R<|.
616 // If that was the *only* out-call on our stack, |?{mStack.size() == 1}|,
617 // then other side "finished with us," and went back to its own business.
618 // That business might have included sending any number of async message
619 // |A<*| until sending a blocking message |(S< | C<)|. If we had more than
620 // one Interrupt call on our stack, the other side *better* not have sent us
621 // another blocking message, because it's blocked on a reply from us.
623 MessageQueue mPending
;
625 // Stack of all the out-calls on which this channel is awaiting responses.
626 // Each stack refers to a different protocol and the stacks are mutually
627 // exclusive: multiple outcalls of the same kind cannot be initiated while
628 // another is active.
629 std::stack
<Message
> mInterruptStack
;
631 // This is what we think the Interrupt stack depth is on the "other side" of this
632 // Interrupt channel. We maintain this variable so that we can detect racy Interrupt
633 // calls. With each Interrupt out-call sent, we send along what *we* think the
634 // stack depth of the remote side is *before* it will receive the Interrupt call.
636 // After sending the out-call, our stack depth is "incremented" by pushing
637 // that pending message onto mPending.
639 // Then when processing an in-call |c|, it must be true that
641 // mStack.size() == c.remoteDepth
643 // I.e., my depth is actually the same as what the other side thought it
644 // was when it sent in-call |c|. If this fails to hold, we have detected
645 // racy Interrupt calls.
647 // We then increment mRemoteStackDepth *just before* processing the
648 // in-call, since we know the other side is waiting on it, and decrement
649 // it *just after* finishing processing that in-call, since our response
650 // will pop the top of the other side's |mPending|.
652 // One nice aspect of this race detection is that it is symmetric; if one
653 // side detects a race, then the other side must also detect the same race.
654 size_t mRemoteStackDepthGuess
;
656 // Approximation of code frames on the C++ stack. It can only be
657 // interpreted as the implication:
659 // !mCxxStackFrames.empty() => MessageChannel code on C++ stack
661 // This member is only accessed on the worker thread, and so is not
662 // protected by mMonitor. It is managed exclusively by the helper
663 // |class CxxStackFrame|.
664 mozilla::Vector
<InterruptFrame
> mCxxStackFrames
;
666 // Did we process an Interrupt out-call during this stack? Only meaningful in
667 // ExitedCxxStack(), from which this variable is reset.
668 bool mSawInterruptOutMsg
;
670 // Are we waiting on this channel for an incoming message? This is used
671 // to implement WaitForIncomingMessage(). Must only be accessed while owning
673 bool mIsWaitingForIncoming
;
675 // Map of replies received "out of turn", because of Interrupt
676 // in-calls racing with replies to outstanding in-calls. See
677 // https://bugzilla.mozilla.org/show_bug.cgi?id=521929.
678 MessageMap mOutOfTurnReplies
;
680 // Stack of Interrupt in-calls that were deferred because of race
682 std::stack
<Message
> mDeferred
;
688 // Should the channel abort the process from the I/O thread when
689 // a channel error occurs?
692 // Should we prevent scripts from running while dispatching urgent messages?
695 // See SetChannelFlags
698 // Task and state used to asynchronously notify channel has been connected
699 // safely. This is necessary to be able to cancel notification if we are
700 // closed at the same time.
701 nsRefPtr
<RefCountedTask
> mOnChannelConnectedTask
;
702 DebugOnly
<bool> mPeerPidSet
;
707 ParentProcessIsBlocked();
710 } // namespace mozilla
712 #endif // ifndef ipc_glue_MessageChannel_h