chromeos: dbus: add Bluetooth properties support
[chromium-blink-merge.git] / content / common / webmessageportchannel_impl.cc
blob8e8f8c12cb4ed2ef9a3d22efdc7ddd7d8b7a4f0e
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/common/webmessageportchannel_impl.h"
7 #include "base/bind.h"
8 #include "content/common/child_process.h"
9 #include "content/common/child_thread.h"
10 #include "content/common/worker_messages.h"
11 #include "third_party/WebKit/Source/WebKit/chromium/public/platform/WebString.h"
12 #include "third_party/WebKit/Source/WebKit/chromium/public/WebMessagePortChannelClient.h"
14 using WebKit::WebMessagePortChannel;
15 using WebKit::WebMessagePortChannelArray;
16 using WebKit::WebMessagePortChannelClient;
17 using WebKit::WebString;
19 WebMessagePortChannelImpl::WebMessagePortChannelImpl()
20 : client_(NULL),
21 route_id_(MSG_ROUTING_NONE),
22 message_port_id_(MSG_ROUTING_NONE) {
23 AddRef();
24 Init();
27 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
28 int route_id,
29 int message_port_id)
30 : client_(NULL),
31 route_id_(route_id),
32 message_port_id_(message_port_id) {
33 AddRef();
34 Init();
37 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
38 // If we have any queued messages with attached ports, manually destroy them.
39 while (!message_queue_.empty()) {
40 const std::vector<WebMessagePortChannelImpl*>& channel_array =
41 message_queue_.front().ports;
42 for (size_t i = 0; i < channel_array.size(); i++) {
43 channel_array[i]->destroy();
45 message_queue_.pop();
48 if (message_port_id_ != MSG_ROUTING_NONE)
49 Send(new WorkerProcessHostMsg_DestroyMessagePort(message_port_id_));
51 if (route_id_ != MSG_ROUTING_NONE)
52 ChildThread::current()->RemoveRoute(route_id_);
55 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) {
56 // Must lock here since client_ is called on the main thread.
57 base::AutoLock auto_lock(lock_);
58 client_ = client;
61 void WebMessagePortChannelImpl::destroy() {
62 setClient(NULL);
64 // Release the object on the main thread, since the destructor might want to
65 // send an IPC, and that has to happen on the main thread.
66 ChildThread::current()->message_loop()->ReleaseSoon(FROM_HERE, this);
69 void WebMessagePortChannelImpl::entangle(WebMessagePortChannel* channel) {
70 // The message port ids might not be set up yet, if this channel wasn't
71 // created on the main thread. So need to wait until we're on the main thread
72 // before getting the other message port id.
73 scoped_refptr<WebMessagePortChannelImpl> webchannel(
74 static_cast<WebMessagePortChannelImpl*>(channel));
75 Entangle(webchannel);
78 void WebMessagePortChannelImpl::postMessage(
79 const WebString& message,
80 WebMessagePortChannelArray* channels) {
81 if (MessageLoop::current() != ChildThread::current()->message_loop()) {
82 ChildThread::current()->message_loop()->PostTask(
83 FROM_HERE,
84 base::Bind(&WebMessagePortChannelImpl::postMessage, this,
85 message, channels));
86 return;
89 std::vector<int> message_port_ids(channels ? channels->size() : 0);
90 if (channels) {
91 // Extract the port IDs from the source array, then free it.
92 for (size_t i = 0; i < channels->size(); ++i) {
93 WebMessagePortChannelImpl* webchannel =
94 static_cast<WebMessagePortChannelImpl*>((*channels)[i]);
95 message_port_ids[i] = webchannel->message_port_id();
96 webchannel->QueueMessages();
97 DCHECK(message_port_ids[i] != MSG_ROUTING_NONE);
99 delete channels;
102 IPC::Message* msg = new WorkerProcessHostMsg_PostMessage(
103 message_port_id_, message, message_port_ids);
104 Send(msg);
107 bool WebMessagePortChannelImpl::tryGetMessage(
108 WebString* message,
109 WebMessagePortChannelArray& channels) {
110 base::AutoLock auto_lock(lock_);
111 if (message_queue_.empty())
112 return false;
114 *message = message_queue_.front().message;
115 const std::vector<WebMessagePortChannelImpl*>& channel_array =
116 message_queue_.front().ports;
117 WebMessagePortChannelArray result_ports(channel_array.size());
118 for (size_t i = 0; i < channel_array.size(); i++) {
119 result_ports[i] = channel_array[i];
122 channels.swap(result_ports);
123 message_queue_.pop();
124 return true;
127 void WebMessagePortChannelImpl::Init() {
128 if (MessageLoop::current() != ChildThread::current()->message_loop()) {
129 ChildThread::current()->message_loop()->PostTask(
130 FROM_HERE,
131 base::Bind(&WebMessagePortChannelImpl::Init, this));
132 return;
135 if (route_id_ == MSG_ROUTING_NONE) {
136 DCHECK(message_port_id_ == MSG_ROUTING_NONE);
137 Send(new WorkerProcessHostMsg_CreateMessagePort(
138 &route_id_, &message_port_id_));
141 ChildThread::current()->AddRoute(route_id_, this);
144 void WebMessagePortChannelImpl::Entangle(
145 scoped_refptr<WebMessagePortChannelImpl> channel) {
146 if (MessageLoop::current() != ChildThread::current()->message_loop()) {
147 ChildThread::current()->message_loop()->PostTask(
148 FROM_HERE,
149 base::Bind(&WebMessagePortChannelImpl::Entangle, this, channel));
150 return;
153 Send(new WorkerProcessHostMsg_Entangle(
154 message_port_id_, channel->message_port_id()));
157 void WebMessagePortChannelImpl::QueueMessages() {
158 if (MessageLoop::current() != ChildThread::current()->message_loop()) {
159 ChildThread::current()->message_loop()->PostTask(
160 FROM_HERE,
161 base::Bind(&WebMessagePortChannelImpl::QueueMessages, this));
162 return;
164 // This message port is being sent elsewhere (perhaps to another process).
165 // The new endpoint needs to receive the queued messages, including ones that
166 // could still be in-flight. So we tell the browser to queue messages, and it
167 // sends us an ack, whose receipt we know means that no more messages are
168 // in-flight. We then send the queued messages to the browser, which prepends
169 // them to the ones it queued and it sends them to the new endpoint.
170 Send(new WorkerProcessHostMsg_QueueMessages(message_port_id_));
172 // The process could potentially go away while we're still waiting for
173 // in-flight messages. Ensure it stays alive.
174 ChildProcess::current()->AddRefProcess();
177 void WebMessagePortChannelImpl::Send(IPC::Message* message) {
178 if (MessageLoop::current() != ChildThread::current()->message_loop()) {
179 DCHECK(!message->is_sync());
180 ChildThread::current()->message_loop()->PostTask(
181 FROM_HERE,
182 base::Bind(&WebMessagePortChannelImpl::Send, this, message));
183 return;
186 ChildThread::current()->Send(message);
189 bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message& message) {
190 bool handled = true;
191 IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl, message)
192 IPC_MESSAGE_HANDLER(WorkerProcessMsg_Message, OnMessage)
193 IPC_MESSAGE_HANDLER(WorkerProcessMsg_MessagesQueued, OnMessagedQueued)
194 IPC_MESSAGE_UNHANDLED(handled = false)
195 IPC_END_MESSAGE_MAP()
196 return handled;
199 void WebMessagePortChannelImpl::OnMessage(
200 const string16& message,
201 const std::vector<int>& sent_message_port_ids,
202 const std::vector<int>& new_routing_ids) {
203 base::AutoLock auto_lock(lock_);
204 Message msg;
205 msg.message = message;
206 if (!sent_message_port_ids.empty()) {
207 msg.ports.resize(sent_message_port_ids.size());
208 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
209 msg.ports[i] = new WebMessagePortChannelImpl(
210 new_routing_ids[i], sent_message_port_ids[i]);
214 bool was_empty = message_queue_.empty();
215 message_queue_.push(msg);
216 if (client_ && was_empty)
217 client_->messageAvailable();
220 void WebMessagePortChannelImpl::OnMessagedQueued() {
221 std::vector<QueuedMessage> queued_messages;
224 base::AutoLock auto_lock(lock_);
225 queued_messages.reserve(message_queue_.size());
226 while (!message_queue_.empty()) {
227 string16 message = message_queue_.front().message;
228 const std::vector<WebMessagePortChannelImpl*>& channel_array =
229 message_queue_.front().ports;
230 std::vector<int> port_ids(channel_array.size());
231 for (size_t i = 0; i < channel_array.size(); ++i) {
232 port_ids[i] = channel_array[i]->message_port_id();
234 queued_messages.push_back(std::make_pair(message, port_ids));
235 message_queue_.pop();
239 Send(new WorkerProcessHostMsg_SendQueuedMessages(
240 message_port_id_, queued_messages));
242 message_port_id_ = MSG_ROUTING_NONE;
244 Release();
245 ChildProcess::current()->ReleaseProcess();
248 WebMessagePortChannelImpl::Message::Message() {}
250 WebMessagePortChannelImpl::Message::~Message() {}