2 +----------------------------------------------------------------------+
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #include "hphp/runtime/server/xbox-server.h"
18 #include "hphp/runtime/base/builtin-functions.h"
19 #include "hphp/runtime/base/comparisons.h"
20 #include "hphp/runtime/base/runtime-option.h"
21 #include "hphp/runtime/server/rpc-request-handler.h"
22 #include "hphp/runtime/server/satellite-server.h"
23 #include "hphp/runtime/base/libevent-http-client.h"
24 #include "hphp/runtime/server/job-queue-vm-stack.h"
25 #include "hphp/runtime/server/server-task-event.h"
26 #include "hphp/util/job-queue.h"
27 #include "hphp/util/lock.h"
28 #include "hphp/util/logger.h"
29 #include "hphp/util/timer.h"
30 #include "hphp/system/systemlib.h"
33 ///////////////////////////////////////////////////////////////////////////////
37 XboxTransport::XboxTransport(const folly::StringPiece message
,
38 const folly::StringPiece reqInitDoc
/* = "" */)
39 : m_refCount(0), m_done(false), m_code(0), m_event(nullptr) {
40 Timer::GetMonotonicTime(m_queueTime
);
42 m_message
.append(message
.data(), message
.size());
43 m_reqInitDoc
.append(reqInitDoc
.data(), reqInitDoc
.size());
44 disableCompression(); // so we don't have to decompress during sendImpl()
47 const char *XboxTransport::getUrl() {
48 if (!m_reqInitDoc
.empty()) {
49 return "xbox_process_call_message";
51 return RuntimeOption::XboxProcessMessageFunc
.c_str();
54 std::string
XboxTransport::getHeader(const char *name
) {
55 if (!strcasecmp(name
, "Host")) return m_host
;
56 if (!strcasecmp(name
, "ReqInitDoc")) return m_reqInitDoc
;
60 void XboxTransport::sendImpl(const void* data
, int size
, int code
,
61 bool /*chunked*/, bool eom
) {
62 m_response
.append((const char*)data
, size
);
71 void XboxTransport::onSendEndImpl() {
83 String
XboxTransport::getResults(int &code
, int timeout_ms
/* = 0 */) {
88 long long seconds
= timeout_ms
/ 1000;
89 long long nanosecs
= (timeout_ms
- seconds
* 1000) * 1000;
90 if (!wait(seconds
, nanosecs
)) {
92 return empty_string();
100 String
response(m_response
.c_str(), m_response
.size(), CopyString
);
106 ///////////////////////////////////////////////////////////////////////////////
108 static THREAD_LOCAL(std::shared_ptr
<XboxServerInfo
>, s_xbox_server_info
);
109 static THREAD_LOCAL(std::string
, s_xbox_prev_req_init_doc
);
111 struct XboxRequestHandler
: RPCRequestHandler
{
112 XboxRequestHandler() : RPCRequestHandler(
113 (*s_xbox_server_info
)->getTimeoutSeconds().count(), Info
) {}
117 bool XboxRequestHandler::Info
= false;
119 static THREAD_LOCAL(XboxRequestHandler
, s_xbox_request_handler
);
121 ///////////////////////////////////////////////////////////////////////////////
124 : JobQueueWorker
<XboxTransport
*,Server
*,true,false,JobQueueDropVMStack
>
126 void doJob(XboxTransport
*job
) override
{
128 // If this job or the previous job that ran on this thread have
129 // a custom initial document, make sure we do a reset
130 string reqInitDoc
= job
->getHeader("ReqInitDoc");
131 *s_xbox_prev_req_init_doc
= reqInitDoc
;
133 job
->onRequestStart(job
->getStartTimer());
134 createRequestHandler()->run(job
);
135 destroyRequestHandler();
138 Logger::Error("RpcRequestHandler leaked exceptions");
142 RequestHandler
*createRequestHandler() {
143 if (!*s_xbox_server_info
) {
144 *s_xbox_server_info
= std::make_shared
<XboxServerInfo
>();
146 if (RuntimeOption::XboxServerLogInfo
) XboxRequestHandler::Info
= true;
147 s_xbox_request_handler
->setServerInfo(*s_xbox_server_info
);
148 s_xbox_request_handler
->setReturnEncodeType(
149 RPCRequestHandler::ReturnEncodeType::Serialize
);
150 return s_xbox_request_handler
.get();
153 void destroyRequestHandler() {
154 if (!s_xbox_request_handler
.isNull()) {
155 s_xbox_request_handler
.destroy();
159 void onThreadExit() override
{
160 if (!s_xbox_request_handler
.isNull()) {
161 s_xbox_request_handler
.destroy();
166 ///////////////////////////////////////////////////////////////////////////////
168 static JobQueueDispatcher
<XboxWorker
> *s_dispatcher
;
169 static Mutex s_dispatchMutex
;
171 void XboxServer::Restart() {
174 if (RuntimeOption::XboxServerThreadCount
> 0) {
176 Lock
l(s_dispatchMutex
);
177 s_dispatcher
= new JobQueueDispatcher
<XboxWorker
>
178 (RuntimeOption::XboxServerThreadCount
,
179 RuntimeOption::XboxServerThreadCount
,
180 RuntimeOption::ServerThreadDropCacheTimeoutSeconds
,
181 RuntimeOption::ServerThreadDropStack
,
184 if (RuntimeOption::XboxServerLogInfo
) {
185 Logger::Info("xbox server started");
187 s_dispatcher
->start();
191 void XboxServer::Stop() {
192 if (!s_dispatcher
) return;
194 Lock
l(s_dispatchMutex
);
195 if (!s_dispatcher
) return;
197 s_dispatcher
->stop();
199 s_dispatcher
= nullptr;
202 ///////////////////////////////////////////////////////////////////////////////
206 s_response("response"),
208 s_localhost("localhost"),
209 s_127_0_0_1("127.0.0.1");
211 static bool isLocalHost(const String
& host
) {
212 return host
.empty() || host
== s_localhost
|| host
== s_127_0_0_1
;
215 bool XboxServer::SendMessage(const String
& message
,
218 const String
& host
/* = "localhost" */) {
219 if (isLocalHost(host
)) {
222 Lock
l(s_dispatchMutex
);
227 job
= new XboxTransport(message
.toCppString());
228 job
->incRefCount(); // paired with worker's decRefCount()
229 job
->incRefCount(); // paired with decRefCount() at below
230 assertx(s_dispatcher
);
231 s_dispatcher
->enqueue(job
);
234 if (timeout_ms
<= 0) {
235 timeout_ms
= RuntimeOption::XboxDefaultLocalTimeoutMilliSeconds
;
239 String response
= job
->getResults(code
, timeout_ms
);
240 job
->decRefCount(); // i'm done with this job
243 ret
.set(s_code
, code
);
247 unserialize_from_string(
249 VariableUnserializer::Type::Internal
253 ret
.set(s_error
, response
);
260 string url
= "http://";
263 url
+= RuntimeOption::XboxProcessMessageFunc
;
265 int timeoutSeconds
= timeout_ms
/ 1000;
266 if (timeoutSeconds
<= 0) {
267 timeoutSeconds
= RuntimeOption::XboxDefaultRemoteTimeoutSeconds
;
270 string
hostStr(host
.data());
271 std::vector
<std::string
> headers
;
272 LibEventHttpClientPtr http
=
273 LibEventHttpClient::Get(hostStr
, RuntimeOption::XboxServerPort
);
274 if (http
->send(url
, headers
, timeoutSeconds
, false,
275 message
.data(), message
.size())) {
276 int code
= http
->getCode();
279 char *response
= http
->recv(len
);
280 String
sresponse(response
, len
, AttachString
);
281 ret
.set(s_code
, code
);
285 unserialize_from_string(
287 VariableUnserializer::Type::Internal
291 ret
.set(s_error
, sresponse
);
295 // code wasn't correctly set by http client, treat it as not found
296 ret
.set(s_code
, 404);
297 ret
.set(s_error
, "http client failed");
304 bool XboxServer::PostMessage(const String
& message
,
305 const String
& host
/* = "localhost" */) {
306 if (isLocalHost(host
)) {
307 Lock
l(s_dispatchMutex
);
312 XboxTransport
*job
= new XboxTransport(message
.toCppString());
313 job
->incRefCount(); // paired with worker's decRefCount()
314 assertx(s_dispatcher
);
315 s_dispatcher
->enqueue(job
);
320 string url
= "http://";
322 url
+= "/xbox_post_message";
324 std::vector
<std::string
> headers
;
325 std::string
hostStr(host
.data());
326 LibEventHttpClientPtr http
=
327 LibEventHttpClient::Get(hostStr
, RuntimeOption::XboxServerPort
);
328 if (http
->send(url
, headers
, 0, false, message
.data(), message
.size())) {
329 int code
= http
->getCode();
332 char *response
= http
->recv(len
);
333 String
sresponse(response
, len
, AttachString
);
336 unserialize_from_string(
338 VariableUnserializer::Type::Internal
352 ///////////////////////////////////////////////////////////////////////////////
354 struct XboxTask
: SweepableResourceData
{
355 DECLARE_RESOURCE_ALLOCATION(XboxTask
)
357 XboxTask(const String
& message
, const String
& reqInitDoc
= "") {
358 m_job
= new XboxTransport(message
.toCppString(), reqInitDoc
.toCppString());
359 m_job
->incRefCount();
362 ~XboxTask() override
{
363 m_job
->decRefCount();
366 XboxTransport
*getJob() { return m_job
;}
368 CLASSNAME_IS("XboxTask");
369 // overriding ResourceData
370 const String
& o_getClassNameHook() const override
{ return classnameof(); }
373 XboxTransport
*m_job
;
375 IMPLEMENT_RESOURCE_ALLOCATION(XboxTask
)
377 ///////////////////////////////////////////////////////////////////////////////
379 Resource
XboxServer::TaskStart(const String
& msg
,
380 const String
& reqInitDoc
/* = "" */,
381 ServerTaskEvent
<XboxServer
, XboxTransport
> *event
/* = nullptr */) {
383 Lock
l(s_dispatchMutex
);
385 (s_dispatcher
->getActiveWorker() <
386 RuntimeOption::XboxServerThreadCount
||
387 s_dispatcher
->getQueuedJobs() <
388 RuntimeOption::XboxServerMaxQueueLength
)) {
389 auto task
= req::make
<XboxTask
>(msg
, reqInitDoc
);
390 XboxTransport
*job
= task
->getJob();
391 job
->incRefCount(); // paired with worker's decRefCount()
393 Transport
*transport
= g_context
->getTransport();
395 job
->setHost(transport
->getHeader("Host"));
399 job
->setAsioEvent(event
);
403 assertx(s_dispatcher
);
404 s_dispatcher
->enqueue(job
);
406 return Resource(std::move(task
));
410 (RuntimeOption::XboxServerThreadCount
> 0 ?
411 "Cannot create new Xbox task because the Xbox queue has "
412 "reached maximum capacity" :
413 "Cannot create new Xbox task because the Xbox is not enabled");
415 throw_exception(SystemLib::AllocExceptionObject(errMsg
));
419 void XboxServer::TaskStartFromNonRequest(
420 const folly::StringPiece msg
,
421 const folly::StringPiece reqInitDoc
/* ="" */
424 Lock
l(s_dispatchMutex
);
426 (s_dispatcher
->getActiveWorker() <
427 RuntimeOption::XboxServerThreadCount
||
428 s_dispatcher
->getQueuedJobs() <
429 RuntimeOption::XboxServerMaxQueueLength
)) {
430 XboxTransport
*job
= new XboxTransport(msg
, reqInitDoc
);
431 job
->incRefCount(); // paired with worker's decRefCount()
433 assertx(s_dispatcher
);
434 s_dispatcher
->enqueue(job
);
439 (RuntimeOption::XboxServerThreadCount
> 0 ?
440 "Cannot create new Xbox task because the Xbox queue has "
441 "reached maximum capacity" :
442 "Cannot create new Xbox task because the Xbox is not enabled");
444 throw std::runtime_error(errMsg
);
447 bool XboxServer::TaskStatus(const Resource
& task
) {
448 return cast
<XboxTask
>(task
)->getJob()->isDone();
451 int XboxServer::TaskResult(const Resource
& task
, int timeout_ms
, Variant
*ret
) {
452 return TaskResult(cast
<XboxTask
>(task
)->getJob(), timeout_ms
, ret
);
455 int XboxServer::TaskResult(XboxTransport
*job
, int timeout_ms
, Variant
*ret
) {
457 String response
= job
->getResults(code
, timeout_ms
);
461 unserialize_from_string(response
, VariableUnserializer::Type::Internal
);
469 std::shared_ptr
<XboxServerInfo
> XboxServer::GetServerInfo() {
470 return *s_xbox_server_info
;
473 RPCRequestHandler
*XboxServer::GetRequestHandler() {
474 if (s_xbox_request_handler
.isNull()) return nullptr;
475 return s_xbox_request_handler
.get();
478 ///////////////////////////////////////////////////////////////////////////////