2 +----------------------------------------------------------------------+
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #include "hphp/runtime/server/pagelet-server.h"
19 #include "hphp/runtime/server/transport.h"
20 #include "hphp/runtime/server/http-request-handler.h"
21 #include "hphp/runtime/server/upload.h"
22 #include "hphp/runtime/server/job-queue-vm-stack.h"
23 #include "hphp/runtime/server/host-health-monitor.h"
24 #include "hphp/runtime/base/array-init.h"
25 #include "hphp/runtime/base/builtin-functions.h"
26 #include "hphp/runtime/base/string-buffer.h"
27 #include "hphp/runtime/base/runtime-option.h"
28 #include "hphp/runtime/ext/server/ext_server.h"
29 #include "hphp/util/boot-stats.h"
30 #include "hphp/util/compatibility.h"
31 #include "hphp/util/job-queue.h"
32 #include "hphp/util/lock.h"
33 #include "hphp/util/logger.h"
34 #include "hphp/util/service-data.h"
35 #include "hphp/util/timer.h"
41 ///////////////////////////////////////////////////////////////////////////////
43 PageletTransport::PageletTransport(
44 const String
& url
, const Array
& headers
, const String
& postData
,
45 const String
& remoteHost
, const set
<std::string
> &rfc1867UploadedFiles
,
46 const Array
& files
, int timeoutSeconds
)
48 m_timeoutSeconds(timeoutSeconds
),
53 Timer::GetMonotonicTime(m_queueTime
);
54 m_threadType
= ThreadType::PageletThread
;
56 m_url
.append(url
.data(), url
.size());
57 m_remoteHost
.append(remoteHost
.data(), remoteHost
.size());
59 for (ArrayIter
iter(headers
); iter
; ++iter
) {
60 auto const key
= iter
.first();
61 auto const header
= iter
.second().toString();
62 if (key
.isString() && !key
.toString().empty()) {
63 m_requestHeaders
[key
.toString().data()].push_back(header
.data());
65 int pos
= header
.find(": ");
67 std::string name
= header
.substr(0, pos
).data();
68 std::string value
= header
.substr(pos
+ 2).data();
69 m_requestHeaders
[name
].push_back(value
);
71 Logger::Error("throwing away bad header: %s", header
.data());
76 if (postData
.empty()) {
80 m_postData
.append(postData
.data(), postData
.size());
83 disableCompression(); // so we don't have to decompress during sendImpl()
84 m_rfc1867UploadedFiles
= rfc1867UploadedFiles
;
85 m_files
= (std::string
) f_serialize(files
);
88 const char *PageletTransport::getUrl() {
92 const char *PageletTransport::getRemoteHost() {
93 return m_remoteHost
.c_str();
96 uint16_t PageletTransport::getRemotePort() {
100 const void *PageletTransport::getPostData(size_t &size
) {
101 size
= m_postData
.size();
102 return m_postData
.data();
105 Transport::Method
PageletTransport::getMethod() {
106 return m_get
? Transport::Method::GET
: Transport::Method::POST
;
109 std::string
PageletTransport::getHeader(const char *name
) {
110 assertx(name
&& *name
);
111 HeaderMap::const_iterator iter
= m_requestHeaders
.find(name
);
112 if (iter
!= m_requestHeaders
.end()) {
113 return iter
->second
[0];
118 const HeaderMap
& PageletTransport::getHeaders() {
119 return m_requestHeaders
;
122 void PageletTransport::addHeaderImpl(const char *name
, const char *value
) {
123 assertx(name
&& *name
);
125 m_responseHeaders
[name
].push_back(value
);
128 void PageletTransport::removeHeaderImpl(const char *name
) {
129 assertx(name
&& *name
);
130 m_responseHeaders
.erase(name
);
133 void PageletTransport::sendImpl(const void* data
, int size
, int code
,
134 bool /*chunked*/, bool eom
) {
135 m_response
.append((const char*)data
, size
);
144 void PageletTransport::onSendEndImpl() {
148 constexpr uintptr_t kTrashedEvent
= 0xfeeefeeef001f001;
150 m_event
= reinterpret_cast<PageletServerTaskEvent
*>(kTrashedEvent
);
155 bool PageletTransport::isUploadedFile(const String
& filename
) {
156 return m_rfc1867UploadedFiles
.find(filename
.c_str()) !=
157 m_rfc1867UploadedFiles
.end();
160 bool PageletTransport::getFiles(std::string
&files
) {
165 bool PageletTransport::isDone() {
169 void PageletTransport::addToPipeline(const std::string
&s
) {
170 // the output buffer is already closed; nothing to do
174 m_pipeline
.push_back(s
);
182 bool PageletTransport::isPipelineEmpty() {
184 return m_pipeline
.empty();
187 Array
PageletTransport::getAsyncResults(bool allow_empty
) {
188 auto results
= Array::CreateVArray();
189 PageletServerTaskEvent
* next_event
= nullptr;
194 assertx(m_done
|| !m_pipeline
.empty() || allow_empty
);
195 while (!m_pipeline
.empty()) {
196 std::string
&str
= m_pipeline
.front();
197 String
response(str
.c_str(), str
.size(), CopyString
);
198 results
.append(response
);
199 m_pipeline
.pop_front();
205 String
response(m_response
.c_str(), m_response
.size(), CopyString
);
206 results
.append(response
);
208 next_event
= new PageletServerTaskEvent();
209 m_event
= next_event
;
210 m_event
->setJob(this);
217 ? make_tv
<KindOfObject
>(next_event
->getWaitHandle())
218 : make_tv
<KindOfNull
>())
219 .append(make_tv
<KindOfInt64
>(code
))
223 String
PageletTransport::getResults(
230 while (!m_done
&& m_pipeline
.empty()) {
231 if (timeout_ms
> 0) {
232 long seconds
= timeout_ms
/ 1000;
233 long long nanosecs
= (timeout_ms
% 1000) * 1000000;
234 if (!wait(seconds
, nanosecs
)) {
236 return empty_string();
243 if (!m_pipeline
.empty()) {
244 // intermediate results do not have headers and code
245 std::string ret
= m_pipeline
.front();
246 m_pipeline
.pop_front();
252 String
response(m_response
.c_str(), m_response
.size(), CopyString
);
253 headers
= Array::Create();
254 for (HeaderMap::const_iterator iter
= m_responseHeaders
.begin();
255 iter
!= m_responseHeaders
.end(); ++iter
) {
256 for (unsigned int i
= 0; i
< iter
->second
.size(); i
++) {
258 sb
.append(iter
->first
);
260 sb
.append(iter
->second
[i
]);
261 headers
.append(sb
.detach());
269 void PageletTransport::incRefCount() {
273 void PageletTransport::decRefCount() {
274 assertx(m_refCount
.load() > 0);
275 if (--m_refCount
== 0) {
280 const timespec
& PageletTransport::getStartTimer() const {
284 int PageletTransport::getTimeoutSeconds() const {
285 return m_timeoutSeconds
;
288 void PageletTransport::setAsioEvent(PageletServerTaskEvent
*event
) {
292 ///////////////////////////////////////////////////////////////////////////////
294 static int64_t to_ms(const timespec
& ts
) {
295 return ts
.tv_sec
* 1000 + (ts
.tv_nsec
/ 1000000);
299 : JobQueueWorker
<PageletTransport
*,Server
*,true,false,JobQueueDropVMStack
>
301 void doJob(PageletTransport
*job
) override
{
303 job
->onRequestStart(job
->getStartTimer());
304 int timeout
= job
->getTimeoutSeconds();
307 Timer::GetMonotonicTime(ts
);
309 to_ms(job
->getStartTimer()) + timeout
* 1000 - to_ms(ts
);
310 if (delta_ms
> 500) {
311 timeout
= (delta_ms
+ 500) / 1000;
318 HttpRequestHandler(timeout
).run(job
);
321 Logger::Error("HttpRequestHandler leaked exceptions");
326 ///////////////////////////////////////////////////////////////////////////////
328 struct PageletTask
: SweepableResourceData
{
329 DECLARE_RESOURCE_ALLOCATION(PageletTask
)
331 PageletTask(const String
& url
, const Array
& headers
, const String
& post_data
,
332 const String
& remote_host
,
333 const std::set
<std::string
> &rfc1867UploadedFiles
,
334 const Array
& files
, int timeoutSeconds
) {
335 m_job
= new PageletTransport(url
, headers
, remote_host
, post_data
,
336 rfc1867UploadedFiles
, files
, timeoutSeconds
);
337 m_job
->incRefCount();
340 ~PageletTask() override
{
341 m_job
->decRefCount();
344 PageletTransport
*getJob() { return m_job
;}
346 CLASSNAME_IS("PageletTask");
347 // overriding ResourceData
348 const String
& o_getClassNameHook() const override
{ return classnameof(); }
351 PageletTransport
*m_job
;
353 IMPLEMENT_RESOURCE_ALLOCATION(PageletTask
)
355 ///////////////////////////////////////////////////////////////////////////////
356 // implementing PageletServer
358 static JobQueueDispatcher
<PageletWorker
> *s_dispatcher
;
359 static Mutex s_dispatchMutex
;
360 static ServiceData::CounterCallback
s_counters(
361 [](std::map
<std::string
, int64_t>& counters
) {
362 counters
["pagelet_inflight_requests"] = PageletServer::GetActiveWorker();
363 counters
["pagelet_queued_requests"] = PageletServer::GetQueuedJobs();
367 bool PageletServer::Enabled() {
371 void PageletServer::Restart() {
373 if (RuntimeOption::PageletServerThreadCount
> 0) {
375 Lock
l(s_dispatchMutex
);
376 s_dispatcher
= new JobQueueDispatcher
<PageletWorker
>
377 (RuntimeOption::PageletServerThreadCount
,
378 RuntimeOption::PageletServerThreadCount
,
379 RuntimeOption::PageletServerThreadDropCacheTimeoutSeconds
,
380 RuntimeOption::PageletServerThreadDropStack
,
382 s_dispatcher
->setHugePageConfig(
383 RuntimeOption::PageletServerHugeThreadCount
,
384 RuntimeOption::ServerHugeStackKb
);
385 auto monitor
= getSingleton
<HostHealthMonitor
>();
386 monitor
->subscribe(s_dispatcher
);
388 s_dispatcher
->start();
389 BootStats::mark("pagelet server started");
393 void PageletServer::Stop() {
395 auto monitor
= getSingleton
<HostHealthMonitor
>();
396 monitor
->unsubscribe(s_dispatcher
);
397 s_dispatcher
->stop();
398 Lock
l(s_dispatchMutex
);
400 s_dispatcher
= nullptr;
404 Resource
PageletServer::TaskStart(
405 const String
& url
, const Array
& headers
,
406 const String
& remote_host
,
407 const String
& post_data
/* = null_string */,
408 const Array
& files
/* = null_array */,
409 int timeoutSeconds
/* = -1 */,
410 PageletServerTaskEvent
*event
/* = nullptr*/
412 static auto pageletOverflowCounter
=
413 ServiceData::createTimeSeries("pagelet_overflow",
414 { ServiceData::StatsType::COUNT
});
416 Lock
l(s_dispatchMutex
);
420 if (RuntimeOption::PageletServerQueueLimit
> 0 &&
421 s_dispatcher
->getQueuedJobs() >
422 RuntimeOption::PageletServerQueueLimit
) {
423 pageletOverflowCounter
->addValue(1);
427 auto task
= req::make
<PageletTask
>(url
, headers
, remote_host
, post_data
,
428 get_uploaded_files(), files
,
430 PageletTransport
*job
= task
->getJob();
431 Lock
l(s_dispatchMutex
);
433 job
->incRefCount(); // paired with worker's decRefCount()
436 job
->setAsioEvent(event
);
440 s_dispatcher
->enqueue(job
);
441 g_context
->incrPageletTasksStarted();
442 return Resource(std::move(task
));
447 int64_t PageletServer::TaskStatus(const Resource
& task
) {
448 PageletTransport
*job
= cast
<PageletTask
>(task
)->getJob();
449 if (!job
->isPipelineEmpty()) {
450 return PAGELET_READY
;
455 return PAGELET_NOT_READY
;
458 String
PageletServer::TaskResult(const Resource
& task
, Array
&headers
, int &code
,
459 int64_t timeout_ms
) {
460 auto ptask
= cast
<PageletTask
>(task
);
461 return ptask
->getJob()->getResults(headers
, code
, timeout_ms
);
464 Array
PageletServer::AsyncTaskResult(const Resource
& task
) {
465 auto ptask
= cast
<PageletTask
>(task
);
466 return ptask
->getJob()->getAsyncResults(true);
469 void PageletServer::AddToPipeline(const std::string
&s
) {
471 PageletTransport
*job
=
472 dynamic_cast<PageletTransport
*>(g_context
->getTransport());
474 job
->addToPipeline(s
);
477 int PageletServer::GetActiveWorker() {
478 Lock
l(s_dispatchMutex
);
479 return s_dispatcher
? s_dispatcher
->getActiveWorker() : 0;
482 int PageletServer::GetQueuedJobs() {
483 Lock
l(s_dispatchMutex
);
484 return s_dispatcher
? s_dispatcher
->getQueuedJobs() : 0;
487 ///////////////////////////////////////////////////////////////////////////////