[DevTools] Implement DevToolsManager::Observer which notifies about target updates.
[chromium-blink-merge.git] / mojo / spy / spy.cc
blob8de82290bf65a300fcce2e0f1117137ba5f8db46
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/spy/spy.h"
7 #include <vector>
9 #include "base/bind.h"
10 #include "base/compiler_specific.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/memory/ref_counted.h"
14 #include "base/message_loop/message_loop_proxy.h"
15 #include "base/strings/string_number_conversions.h"
16 #include "base/strings/string_split.h"
17 #include "base/threading/thread.h"
18 #include "base/threading/worker_pool.h"
19 #include "base/time/time.h"
20 #include "mojo/application_manager/application_manager.h"
21 #include "mojo/public/cpp/system/core.h"
22 #include "mojo/spy/common.h"
23 #include "mojo/spy/public/spy.mojom.h"
24 #include "mojo/spy/spy_server_impl.h"
25 #include "mojo/spy/websocket_server.h"
26 #include "url/gurl.h"
28 namespace {
30 mojo::WebSocketServer* ws_server = NULL;
32 const size_t kMessageBufSize = 2 * 1024;
33 const size_t kHandleBufSize = 64;
34 const int kDefaultWebSocketPort = 42424;
36 void CloseHandles(MojoHandle* handles, size_t count) {
37 for (size_t ix = 0; ix != count; ++count)
38 MojoClose(handles[ix]);
41 // In charge of processing messages that flow over a
42 // single message pipe.
43 class MessageProcessor :
44 public base::RefCountedThreadSafe<MessageProcessor> {
45 public:
46 MessageProcessor(base::MessageLoopProxy* control_loop_proxy)
47 : last_result_(MOJO_RESULT_OK),
48 bytes_transfered_(0),
49 control_loop_proxy_(control_loop_proxy) {
50 message_count_[0] = 0;
51 message_count_[1] = 0;
52 handle_count_[0] = 0;
53 handle_count_[1] = 0;
56 void Start(mojo::ScopedMessagePipeHandle client,
57 mojo::ScopedMessagePipeHandle interceptor,
58 const GURL& url) {
59 std::vector<mojo::MessagePipeHandle> pipes;
60 pipes.push_back(client.get());
61 pipes.push_back(interceptor.get());
62 std::vector<MojoHandleSignals> handle_signals;
63 handle_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE);
64 handle_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE);
66 scoped_ptr<char[]> mbuf(new char[kMessageBufSize]);
67 scoped_ptr<MojoHandle[]> hbuf(new MojoHandle[kHandleBufSize]);
69 // Main processing loop:
70 // 1- Wait for an endpoint to have a message.
71 // 2- Read the message
72 // 3- Log data
73 // 4- Wait until the opposite port is ready for writting
74 // 4- Write the message to opposite port.
76 for (;;) {
77 int r = WaitMany(pipes, handle_signals, MOJO_DEADLINE_INDEFINITE);
78 if ((r < 0) || (r > 1)) {
79 last_result_ = r;
80 break;
83 uint32_t bytes_read = kMessageBufSize;
84 uint32_t handles_read = kHandleBufSize;
86 if (!CheckResult(ReadMessageRaw(pipes[r],
87 mbuf.get(), &bytes_read,
88 hbuf.get(), &handles_read,
89 MOJO_READ_MESSAGE_FLAG_NONE)))
90 break;
92 if (!bytes_read && !handles_read)
93 continue;
95 if (handles_read) {
96 handle_count_[r] += handles_read;
98 // Intercept message pipes which are returned via the ReadMessageRaw
99 // call
100 for (uint32_t i = 0; i < handles_read; i++) {
101 // Hack to determine if a handle is a message pipe.
102 // TODO(ananta)
103 // We should have an API which given a handle returns additional
104 // information about the handle which includes its type, etc.
105 if (MojoReadMessage(hbuf[i], NULL, NULL, NULL, NULL,
106 MOJO_READ_MESSAGE_FLAG_NONE) !=
107 MOJO_RESULT_INVALID_ARGUMENT) {
108 mojo::ScopedMessagePipeHandle message_pipe_handle;
109 message_pipe_handle.reset(mojo::MessagePipeHandle(hbuf[i]));
111 mojo::ScopedMessagePipeHandle faux_client;
112 mojo::ScopedMessagePipeHandle interceptor;
113 CreateMessagePipe(NULL, &faux_client, &interceptor);
115 base::WorkerPool::PostTask(
116 FROM_HERE,
117 base::Bind(&MessageProcessor::Start,
118 this,
119 base::Passed(&message_pipe_handle),
120 base::Passed(&interceptor),
121 url),
122 true);
123 hbuf.get()[i] = faux_client.release().value();
127 ++message_count_[r];
128 bytes_transfered_ += bytes_read;
130 LogMessageInfo(mbuf.get(), url);
132 mojo::MessagePipeHandle write_handle = (r == 0) ? pipes[1] : pipes[0];
133 if (!CheckResult(Wait(write_handle,
134 MOJO_HANDLE_SIGNAL_WRITABLE,
135 MOJO_DEADLINE_INDEFINITE)))
136 break;
138 if (!CheckResult(WriteMessageRaw(write_handle,
139 mbuf.get(), bytes_read,
140 hbuf.get(), handles_read,
141 MOJO_WRITE_MESSAGE_FLAG_NONE))) {
142 // On failure we own the handles. For now just close them.
143 if (handles_read)
144 CloseHandles(hbuf.get(), handles_read);
145 break;
150 private:
151 friend class base::RefCountedThreadSafe<MessageProcessor>;
152 virtual ~MessageProcessor() {}
154 bool CheckResult(MojoResult mr) {
155 if (mr == MOJO_RESULT_OK)
156 return true;
157 last_result_ = mr;
158 return false;
161 void LogInvalidMessage(const mojo::MojoMessageHeader& header) {
162 LOG(ERROR) << "Invalid message: Number of Fields: "
163 << header.num_fields
164 << " Number of bytes: "
165 << header.num_bytes
166 << " Flags: "
167 << header.flags;
170 // Validates the message as per the mojo spec.
171 bool IsValidMessage(const mojo::MojoMessageHeader& header) {
172 if (header.num_fields == 2) {
173 if (header.num_bytes != sizeof(mojo::MojoMessageHeader)) {
174 LogInvalidMessage(header);
175 return false;
177 } else if (header.num_fields == 3) {
178 if (header.num_bytes != sizeof(mojo::MojoRequestHeader)) {
179 LogInvalidMessage(header);
181 } else if (header.num_fields > 3) {
182 if (header.num_bytes < sizeof(mojo::MojoRequestHeader)) {
183 LogInvalidMessage(header);
184 return false;
187 // These flags should be specified in request or response messages.
188 if (header.num_fields < 3 &&
189 ((header.flags & mojo::kMessageExpectsResponse) ||
190 (header.flags & mojo::kMessageIsResponse))) {
191 LOG(ERROR) << "Invalid request message.";
192 LogInvalidMessage(header);
193 return false;
195 // These flags are mutually exclusive.
196 if ((header.flags & mojo::kMessageExpectsResponse) &&
197 (header.flags & mojo::kMessageIsResponse)) {
198 LOG(ERROR) << "Invalid flags combination in request message.";
199 LogInvalidMessage(header);
200 return false;
202 return true;
205 void LogMessageInfo(void* data, const GURL& url) {
206 mojo::MojoMessageData* message_data =
207 reinterpret_cast<mojo::MojoMessageData*>(data);
208 if (IsValidMessage(message_data->header)) {
209 control_loop_proxy_->PostTask(
210 FROM_HERE,
211 base::Bind(&mojo::WebSocketServer::LogMessageInfo,
212 base::Unretained(ws_server),
213 message_data->header, url, base::Time::Now()));
217 MojoResult last_result_;
218 uint32_t bytes_transfered_;
219 uint32_t message_count_[2];
220 uint32_t handle_count_[2];
221 scoped_refptr<base::MessageLoopProxy> control_loop_proxy_;
224 // In charge of intercepting access to the service manager.
225 class SpyInterceptor : public mojo::ApplicationManager::Interceptor {
226 public:
227 explicit SpyInterceptor(
228 scoped_refptr<mojo::SpyServerImpl> spy_server,
229 const scoped_refptr<base::MessageLoopProxy>& control_loop_proxy)
230 : spy_server_(spy_server),
231 proxy_(base::MessageLoopProxy::current()),
232 control_loop_proxy_(control_loop_proxy) {}
234 private:
235 virtual mojo::ServiceProviderPtr OnConnectToClient(
236 const GURL& url, mojo::ServiceProviderPtr real_client) OVERRIDE {
237 if (!MustIntercept(url))
238 return real_client.Pass();
240 // You can get an invalid handle if the app (or service) is
241 // created by unconventional means, for example the command line.
242 if (!real_client)
243 return real_client.Pass();
245 mojo::ScopedMessagePipeHandle faux_client;
246 mojo::ScopedMessagePipeHandle interceptor;
247 CreateMessagePipe(NULL, &faux_client, &interceptor);
249 scoped_refptr<MessageProcessor> processor =
250 new MessageProcessor(control_loop_proxy_.get());
251 mojo::ScopedMessagePipeHandle real_handle = real_client.PassMessagePipe();
252 base::WorkerPool::PostTask(
253 FROM_HERE,
254 base::Bind(&MessageProcessor::Start,
255 processor,
256 base::Passed(&real_handle), base::Passed(&interceptor),
257 url),
258 true);
260 mojo::ServiceProviderPtr faux_provider;
261 faux_provider.Bind(faux_client.Pass());
262 return faux_provider.Pass();
265 bool MustIntercept(const GURL& url) {
266 // TODO(cpu): manage who and when to intercept.
267 proxy_->PostTask(
268 FROM_HERE,
269 base::Bind(&mojo::SpyServerImpl::OnIntercept, spy_server_, url));
270 return true;
273 scoped_refptr<mojo::SpyServerImpl> spy_server_;
274 scoped_refptr<base::MessageLoopProxy> proxy_;
275 scoped_refptr<base::MessageLoopProxy> control_loop_proxy_;
278 void StartWebServer(int port, mojo::ScopedMessagePipeHandle pipe) {
279 // TODO(cpu) figure out lifetime of the server. See Spy() dtor.
280 ws_server = new mojo::WebSocketServer(port, pipe.Pass());
281 ws_server->Start();
284 struct SpyOptions {
285 int websocket_port;
287 SpyOptions()
288 : websocket_port(kDefaultWebSocketPort) {
292 SpyOptions ProcessOptions(const std::string& options) {
293 SpyOptions spy_options;
294 if (options.empty())
295 return spy_options;
296 base::StringPairs kv_pairs;
297 base::SplitStringIntoKeyValuePairs(options, ':', ',', &kv_pairs);
298 base::StringPairs::iterator it = kv_pairs.begin();
299 for (; it != kv_pairs.end(); ++it) {
300 if (it->first == "port") {
301 int port;
302 if (base::StringToInt(it->second, &port))
303 spy_options.websocket_port = port;
306 return spy_options;
309 } // namespace
311 namespace mojo {
313 Spy::Spy(mojo::ApplicationManager* application_manager,
314 const std::string& options) {
315 SpyOptions spy_options = ProcessOptions(options);
317 spy_server_ = new SpyServerImpl();
319 // Start the tread what will accept commands from the frontend.
320 control_thread_.reset(new base::Thread("mojo_spy_control_thread"));
321 base::Thread::Options thread_options(base::MessageLoop::TYPE_IO, 0);
322 control_thread_->StartWithOptions(thread_options);
323 control_thread_->message_loop_proxy()->PostTask(
324 FROM_HERE, base::Bind(&StartWebServer,
325 spy_options.websocket_port,
326 base::Passed(spy_server_->ServerPipe())));
328 // Start intercepting mojo services.
329 application_manager->SetInterceptor(
330 new SpyInterceptor(spy_server_, control_thread_->message_loop_proxy()));
333 Spy::~Spy() {
334 // TODO(cpu): Do not leak the interceptor. Lifetime between the
335 // application_manager and the spy is still unclear hence the leak.
338 } // namespace mojo