Create dedicate crate for php_escaping.rs
[hiphop-php.git] / hphp / runtime / server / xbox-server.cpp
blob707b681bfa488cf1b45fced367ac4f63efe6188c
1 /*
2 +----------------------------------------------------------------------+
3 | HipHop for PHP |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #include "hphp/runtime/server/xbox-server.h"
18 #include "hphp/runtime/base/builtin-functions.h"
19 #include "hphp/runtime/base/comparisons.h"
20 #include "hphp/runtime/base/runtime-option.h"
21 #include "hphp/runtime/server/rpc-request-handler.h"
22 #include "hphp/runtime/server/satellite-server.h"
23 #include "hphp/runtime/base/libevent-http-client.h"
24 #include "hphp/runtime/server/job-queue-vm-stack.h"
25 #include "hphp/runtime/server/server-task-event.h"
26 #include "hphp/util/job-queue.h"
27 #include "hphp/util/lock.h"
28 #include "hphp/util/logger.h"
29 #include "hphp/util/timer.h"
30 #include "hphp/system/systemlib.h"
32 namespace HPHP {
33 ///////////////////////////////////////////////////////////////////////////////
35 using std::string;
37 XboxTransport::XboxTransport(const folly::StringPiece message,
38 const folly::StringPiece reqInitDoc /* = "" */)
39 : m_refCount(0), m_done(false), m_code(0), m_event(nullptr) {
40 Timer::GetMonotonicTime(m_queueTime);
42 m_message.append(message.data(), message.size());
43 m_reqInitDoc.append(reqInitDoc.data(), reqInitDoc.size());
44 disableCompression(); // so we don't have to decompress during sendImpl()
47 const char *XboxTransport::getUrl() {
48 if (!m_reqInitDoc.empty()) {
49 return "xbox_process_call_message";
51 return RuntimeOption::XboxProcessMessageFunc.c_str();
54 std::string XboxTransport::getHeader(const char *name) {
55 if (!strcasecmp(name, "Host")) return m_host;
56 if (!strcasecmp(name, "ReqInitDoc")) return m_reqInitDoc;
57 return "";
60 void XboxTransport::sendImpl(const void* data, int size, int code,
61 bool /*chunked*/, bool eom) {
62 m_response.append((const char*)data, size);
63 if (code) {
64 m_code = code;
66 if (eom) {
67 onSendEndImpl();
71 void XboxTransport::onSendEndImpl() {
72 Lock lock(this);
73 if (m_done) {
74 return;
76 m_done = true;
77 if (m_event) {
78 m_event->finish();
80 notify();
83 String XboxTransport::getResults(int &code, int timeout_ms /* = 0 */) {
85 Lock lock(this);
86 while (!m_done) {
87 if (timeout_ms > 0) {
88 long long seconds = timeout_ms / 1000;
89 long long nanosecs = (timeout_ms - seconds * 1000) * 1000;
90 if (!wait(seconds, nanosecs)) {
91 code = -1;
92 return empty_string();
94 } else {
95 wait();
100 String response(m_response.c_str(), m_response.size(), CopyString);
101 code = m_code;
103 return response;
106 ///////////////////////////////////////////////////////////////////////////////
108 static THREAD_LOCAL(std::shared_ptr<XboxServerInfo>, s_xbox_server_info);
109 static THREAD_LOCAL(std::string, s_xbox_prev_req_init_doc);
111 struct XboxRequestHandler : RPCRequestHandler {
112 XboxRequestHandler() : RPCRequestHandler(
113 (*s_xbox_server_info)->getTimeoutSeconds().count(), Info) {}
114 static bool Info;
117 bool XboxRequestHandler::Info = false;
119 static THREAD_LOCAL(XboxRequestHandler, s_xbox_request_handler);
121 ///////////////////////////////////////////////////////////////////////////////
123 struct XboxWorker
124 : JobQueueWorker<XboxTransport*,Server*,true,false,JobQueueDropVMStack>
126 void doJob(XboxTransport *job) override {
127 try {
128 // If this job or the previous job that ran on this thread have
129 // a custom initial document, make sure we do a reset
130 string reqInitDoc = job->getHeader("ReqInitDoc");
131 *s_xbox_prev_req_init_doc = reqInitDoc;
133 job->onRequestStart(job->getStartTimer());
134 createRequestHandler()->run(job);
135 destroyRequestHandler();
136 job->decRefCount();
137 } catch (...) {
138 Logger::Error("RpcRequestHandler leaked exceptions");
141 private:
142 RequestHandler *createRequestHandler() {
143 if (!*s_xbox_server_info) {
144 *s_xbox_server_info = std::make_shared<XboxServerInfo>();
146 if (RuntimeOption::XboxServerLogInfo) XboxRequestHandler::Info = true;
147 s_xbox_request_handler->setServerInfo(*s_xbox_server_info);
148 s_xbox_request_handler->setReturnEncodeType(
149 RPCRequestHandler::ReturnEncodeType::Serialize);
150 return s_xbox_request_handler.get();
153 void destroyRequestHandler() {
154 if (!s_xbox_request_handler.isNull()) {
155 s_xbox_request_handler.destroy();
159 void onThreadExit() override {
160 if (!s_xbox_request_handler.isNull()) {
161 s_xbox_request_handler.destroy();
166 ///////////////////////////////////////////////////////////////////////////////
168 static JobQueueDispatcher<XboxWorker> *s_dispatcher;
169 static Mutex s_dispatchMutex;
171 void XboxServer::Restart() {
172 Stop();
174 if (RuntimeOption::XboxServerThreadCount > 0) {
176 Lock l(s_dispatchMutex);
177 s_dispatcher = new JobQueueDispatcher<XboxWorker>
178 (RuntimeOption::XboxServerThreadCount,
179 RuntimeOption::XboxServerThreadCount,
180 RuntimeOption::ServerThreadDropCacheTimeoutSeconds,
181 RuntimeOption::ServerThreadDropStack,
182 nullptr);
184 if (RuntimeOption::XboxServerLogInfo) {
185 Logger::Info("xbox server started");
187 s_dispatcher->start();
191 void XboxServer::Stop() {
192 if (!s_dispatcher) return;
194 Lock l(s_dispatchMutex);
195 if (!s_dispatcher) return;
197 s_dispatcher->stop();
198 delete s_dispatcher;
199 s_dispatcher = nullptr;
202 ///////////////////////////////////////////////////////////////////////////////
204 const StaticString
205 s_code("code"),
206 s_response("response"),
207 s_error("error"),
208 s_localhost("localhost"),
209 s_127_0_0_1("127.0.0.1");
211 static bool isLocalHost(const String& host) {
212 return host.empty() || host == s_localhost || host == s_127_0_0_1;
215 bool XboxServer::SendMessage(const String& message,
216 Array& ret,
217 int timeout_ms,
218 const String& host /* = "localhost" */) {
219 if (isLocalHost(host)) {
220 XboxTransport *job;
222 Lock l(s_dispatchMutex);
223 if (!s_dispatcher) {
224 return false;
227 job = new XboxTransport(message.toCppString());
228 job->incRefCount(); // paired with worker's decRefCount()
229 job->incRefCount(); // paired with decRefCount() at below
230 assertx(s_dispatcher);
231 s_dispatcher->enqueue(job);
234 if (timeout_ms <= 0) {
235 timeout_ms = RuntimeOption::XboxDefaultLocalTimeoutMilliSeconds;
238 int code = 0;
239 String response = job->getResults(code, timeout_ms);
240 job->decRefCount(); // i'm done with this job
242 if (code > 0) {
243 ret.set(s_code, code);
244 if (code == 200) {
245 ret.set(
246 s_response,
247 unserialize_from_string(
248 response,
249 VariableUnserializer::Type::Internal
252 } else {
253 ret.set(s_error, response);
255 return true;
258 } else { // remote
260 string url = "http://";
261 url += host.data();
262 url += '/';
263 url += RuntimeOption::XboxProcessMessageFunc;
265 int timeoutSeconds = timeout_ms / 1000;
266 if (timeoutSeconds <= 0) {
267 timeoutSeconds = RuntimeOption::XboxDefaultRemoteTimeoutSeconds;
270 string hostStr(host.data());
271 std::vector<std::string> headers;
272 LibEventHttpClientPtr http =
273 LibEventHttpClient::Get(hostStr, RuntimeOption::XboxServerPort);
274 if (http->send(url, headers, timeoutSeconds, false,
275 message.data(), message.size())) {
276 int code = http->getCode();
277 if (code > 0) {
278 int len = 0;
279 char *response = http->recv(len);
280 String sresponse(response, len, AttachString);
281 ret.set(s_code, code);
282 if (code == 200) {
283 ret.set(
284 s_response,
285 unserialize_from_string(
286 sresponse,
287 VariableUnserializer::Type::Internal
290 } else {
291 ret.set(s_error, sresponse);
293 return true;
295 // code wasn't correctly set by http client, treat it as not found
296 ret.set(s_code, 404);
297 ret.set(s_error, "http client failed");
301 return false;
304 bool XboxServer::PostMessage(const String& message,
305 const String& host /* = "localhost" */) {
306 if (isLocalHost(host)) {
307 Lock l(s_dispatchMutex);
308 if (!s_dispatcher) {
309 return false;
312 XboxTransport *job = new XboxTransport(message.toCppString());
313 job->incRefCount(); // paired with worker's decRefCount()
314 assertx(s_dispatcher);
315 s_dispatcher->enqueue(job);
316 return true;
318 } else { // remote
320 string url = "http://";
321 url += host.data();
322 url += "/xbox_post_message";
324 std::vector<std::string> headers;
325 std::string hostStr(host.data());
326 LibEventHttpClientPtr http =
327 LibEventHttpClient::Get(hostStr, RuntimeOption::XboxServerPort);
328 if (http->send(url, headers, 0, false, message.data(), message.size())) {
329 int code = http->getCode();
330 if (code > 0) {
331 int len = 0;
332 char *response = http->recv(len);
333 String sresponse(response, len, AttachString);
334 if (code == 200 &&
335 same(
336 unserialize_from_string(
337 sresponse,
338 VariableUnserializer::Type::Internal
340 true
343 return true;
349 return false;
352 ///////////////////////////////////////////////////////////////////////////////
354 struct XboxTask : SweepableResourceData {
355 DECLARE_RESOURCE_ALLOCATION(XboxTask)
357 XboxTask(const String& message, const String& reqInitDoc = "") {
358 m_job = new XboxTransport(message.toCppString(), reqInitDoc.toCppString());
359 m_job->incRefCount();
362 ~XboxTask() override {
363 m_job->decRefCount();
366 XboxTransport *getJob() { return m_job;}
368 CLASSNAME_IS("XboxTask");
369 // overriding ResourceData
370 const String& o_getClassNameHook() const override { return classnameof(); }
372 private:
373 XboxTransport *m_job;
375 IMPLEMENT_RESOURCE_ALLOCATION(XboxTask)
377 ///////////////////////////////////////////////////////////////////////////////
379 Resource XboxServer::TaskStart(const String& msg,
380 const String& reqInitDoc /* = "" */,
381 ServerTaskEvent<XboxServer, XboxTransport> *event /* = nullptr */) {
383 Lock l(s_dispatchMutex);
384 if (s_dispatcher &&
385 (s_dispatcher->getActiveWorker() <
386 RuntimeOption::XboxServerThreadCount ||
387 s_dispatcher->getQueuedJobs() <
388 RuntimeOption::XboxServerMaxQueueLength)) {
389 auto task = req::make<XboxTask>(msg, reqInitDoc);
390 XboxTransport *job = task->getJob();
391 job->incRefCount(); // paired with worker's decRefCount()
393 Transport *transport = g_context->getTransport();
394 if (transport) {
395 job->setHost(transport->getHeader("Host"));
398 if (event) {
399 job->setAsioEvent(event);
400 event->setJob(job);
403 assertx(s_dispatcher);
404 s_dispatcher->enqueue(job);
406 return Resource(std::move(task));
409 const char* errMsg =
410 (RuntimeOption::XboxServerThreadCount > 0 ?
411 "Cannot create new Xbox task because the Xbox queue has "
412 "reached maximum capacity" :
413 "Cannot create new Xbox task because the Xbox is not enabled");
415 throw_exception(SystemLib::AllocExceptionObject(errMsg));
416 return Resource();
419 void XboxServer::TaskStartFromNonRequest(
420 const folly::StringPiece msg,
421 const folly::StringPiece reqInitDoc /* ="" */
424 Lock l(s_dispatchMutex);
425 if (s_dispatcher &&
426 (s_dispatcher->getActiveWorker() <
427 RuntimeOption::XboxServerThreadCount ||
428 s_dispatcher->getQueuedJobs() <
429 RuntimeOption::XboxServerMaxQueueLength)) {
430 XboxTransport *job = new XboxTransport(msg, reqInitDoc);
431 job->incRefCount(); // paired with worker's decRefCount()
433 assertx(s_dispatcher);
434 s_dispatcher->enqueue(job);
435 return;
438 const char* errMsg =
439 (RuntimeOption::XboxServerThreadCount > 0 ?
440 "Cannot create new Xbox task because the Xbox queue has "
441 "reached maximum capacity" :
442 "Cannot create new Xbox task because the Xbox is not enabled");
444 throw std::runtime_error(errMsg);
447 bool XboxServer::TaskStatus(const Resource& task) {
448 return cast<XboxTask>(task)->getJob()->isDone();
451 int XboxServer::TaskResult(const Resource& task, int timeout_ms, Variant *ret) {
452 return TaskResult(cast<XboxTask>(task)->getJob(), timeout_ms, ret);
455 int XboxServer::TaskResult(XboxTransport *job, int timeout_ms, Variant *ret) {
456 int code = 0;
457 String response = job->getResults(code, timeout_ms);
458 if (ret) {
459 if (code == 200) {
460 *ret =
461 unserialize_from_string(response, VariableUnserializer::Type::Internal);
462 } else {
463 *ret = response;
466 return code;
469 std::shared_ptr<XboxServerInfo> XboxServer::GetServerInfo() {
470 return *s_xbox_server_info;
473 RPCRequestHandler *XboxServer::GetRequestHandler() {
474 if (s_xbox_request_handler.isNull()) return nullptr;
475 return s_xbox_request_handler.get();
478 ///////////////////////////////////////////////////////////////////////////////