Create dedicate crate for php_escaping.rs
[hiphop-php.git] / hphp / runtime / server / pagelet-server.cpp
blob49db0ce2d2012ac4b2fe0b448f7bdee828d61fea
1 /*
2 +----------------------------------------------------------------------+
3 | HipHop for PHP |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #include "hphp/runtime/server/pagelet-server.h"
19 #include "hphp/runtime/server/transport.h"
20 #include "hphp/runtime/server/http-request-handler.h"
21 #include "hphp/runtime/server/upload.h"
22 #include "hphp/runtime/server/job-queue-vm-stack.h"
23 #include "hphp/runtime/server/host-health-monitor.h"
24 #include "hphp/runtime/base/array-init.h"
25 #include "hphp/runtime/base/builtin-functions.h"
26 #include "hphp/runtime/base/string-buffer.h"
27 #include "hphp/runtime/base/runtime-option.h"
28 #include "hphp/runtime/ext/server/ext_server.h"
29 #include "hphp/util/boot-stats.h"
30 #include "hphp/util/compatibility.h"
31 #include "hphp/util/job-queue.h"
32 #include "hphp/util/lock.h"
33 #include "hphp/util/logger.h"
34 #include "hphp/util/service-data.h"
35 #include "hphp/util/timer.h"
37 using std::set;
38 using std::deque;
40 namespace HPHP {
41 ///////////////////////////////////////////////////////////////////////////////
43 PageletTransport::PageletTransport(
44 const String& url, const Array& headers, const String& postData,
45 const String& remoteHost, const set<std::string> &rfc1867UploadedFiles,
46 const Array& files, int timeoutSeconds)
47 : m_refCount(0),
48 m_timeoutSeconds(timeoutSeconds),
49 m_done(false),
50 m_code(0),
51 m_event(nullptr) {
53 Timer::GetMonotonicTime(m_queueTime);
54 m_threadType = ThreadType::PageletThread;
56 m_url.append(url.data(), url.size());
57 m_remoteHost.append(remoteHost.data(), remoteHost.size());
59 for (ArrayIter iter(headers); iter; ++iter) {
60 auto const key = iter.first();
61 auto const header = iter.second().toString();
62 if (key.isString() && !key.toString().empty()) {
63 m_requestHeaders[key.toString().data()].push_back(header.data());
64 } else {
65 int pos = header.find(": ");
66 if (pos >= 0) {
67 std::string name = header.substr(0, pos).data();
68 std::string value = header.substr(pos + 2).data();
69 m_requestHeaders[name].push_back(value);
70 } else {
71 Logger::Error("throwing away bad header: %s", header.data());
76 if (postData.empty()) {
77 m_get = true;
78 } else {
79 m_get = false;
80 m_postData.append(postData.data(), postData.size());
83 disableCompression(); // so we don't have to decompress during sendImpl()
84 m_rfc1867UploadedFiles = rfc1867UploadedFiles;
85 m_files = (std::string) f_serialize(files);
88 const char *PageletTransport::getUrl() {
89 return m_url.c_str();
92 const char *PageletTransport::getRemoteHost() {
93 return m_remoteHost.c_str();
96 uint16_t PageletTransport::getRemotePort() {
97 return 0;
100 const void *PageletTransport::getPostData(size_t &size) {
101 size = m_postData.size();
102 return m_postData.data();
105 Transport::Method PageletTransport::getMethod() {
106 return m_get ? Transport::Method::GET : Transport::Method::POST;
109 std::string PageletTransport::getHeader(const char *name) {
110 assertx(name && *name);
111 HeaderMap::const_iterator iter = m_requestHeaders.find(name);
112 if (iter != m_requestHeaders.end()) {
113 return iter->second[0];
115 return "";
118 const HeaderMap& PageletTransport::getHeaders() {
119 return m_requestHeaders;
122 void PageletTransport::addHeaderImpl(const char *name, const char *value) {
123 assertx(name && *name);
124 assertx(value);
125 m_responseHeaders[name].push_back(value);
128 void PageletTransport::removeHeaderImpl(const char *name) {
129 assertx(name && *name);
130 m_responseHeaders.erase(name);
133 void PageletTransport::sendImpl(const void* data, int size, int code,
134 bool /*chunked*/, bool eom) {
135 m_response.append((const char*)data, size);
136 if (code) {
137 m_code = code;
139 if (eom) {
140 onSendEndImpl();
144 void PageletTransport::onSendEndImpl() {
145 Lock lock(this);
146 m_done = true;
147 if (m_event) {
148 constexpr uintptr_t kTrashedEvent = 0xfeeefeeef001f001;
149 m_event->finish();
150 m_event = reinterpret_cast<PageletServerTaskEvent*>(kTrashedEvent);
152 notify();
155 bool PageletTransport::isUploadedFile(const String& filename) {
156 return m_rfc1867UploadedFiles.find(filename.c_str()) !=
157 m_rfc1867UploadedFiles.end();
160 bool PageletTransport::getFiles(std::string &files) {
161 files = m_files;
162 return true;
165 bool PageletTransport::isDone() {
166 return m_done;
169 void PageletTransport::addToPipeline(const std::string &s) {
170 // the output buffer is already closed; nothing to do
171 if (m_done) return;
173 Lock lock(this);
174 m_pipeline.push_back(s);
175 if (m_event) {
176 m_event->finish();
177 m_event = nullptr;
179 notify();
182 bool PageletTransport::isPipelineEmpty() {
183 Lock lock(this);
184 return m_pipeline.empty();
187 Array PageletTransport::getAsyncResults(bool allow_empty) {
188 auto results = Array::CreateVArray();
189 PageletServerTaskEvent* next_event = nullptr;
190 int code = 0;
193 Lock lock(this);
194 assertx(m_done || !m_pipeline.empty() || allow_empty);
195 while (!m_pipeline.empty()) {
196 std::string &str = m_pipeline.front();
197 String response(str.c_str(), str.size(), CopyString);
198 results.append(response);
199 m_pipeline.pop_front();
203 if (m_done) {
204 code = m_code;
205 String response(m_response.c_str(), m_response.size(), CopyString);
206 results.append(response);
207 } else {
208 next_event = new PageletServerTaskEvent();
209 m_event = next_event;
210 m_event->setJob(this);
214 return VArrayInit(3)
215 .append(results)
216 .append(next_event
217 ? make_tv<KindOfObject>(next_event->getWaitHandle())
218 : make_tv<KindOfNull>())
219 .append(make_tv<KindOfInt64>(code))
220 .toArray();
223 String PageletTransport::getResults(
224 Array &headers,
225 int &code,
226 int64_t timeout_ms
229 Lock lock(this);
230 while (!m_done && m_pipeline.empty()) {
231 if (timeout_ms > 0) {
232 long seconds = timeout_ms / 1000;
233 long long nanosecs = (timeout_ms % 1000) * 1000000;
234 if (!wait(seconds, nanosecs)) {
235 code = -1;
236 return empty_string();
238 } else {
239 wait();
243 if (!m_pipeline.empty()) {
244 // intermediate results do not have headers and code
245 std::string ret = m_pipeline.front();
246 m_pipeline.pop_front();
247 code = 0;
248 return ret;
252 String response(m_response.c_str(), m_response.size(), CopyString);
253 headers = Array::Create();
254 for (HeaderMap::const_iterator iter = m_responseHeaders.begin();
255 iter != m_responseHeaders.end(); ++iter) {
256 for (unsigned int i = 0; i < iter->second.size(); i++) {
257 StringBuffer sb;
258 sb.append(iter->first);
259 sb.append(": ");
260 sb.append(iter->second[i]);
261 headers.append(sb.detach());
264 code = m_code;
265 return response;
268 // ref counting
269 void PageletTransport::incRefCount() {
270 ++m_refCount;
273 void PageletTransport::decRefCount() {
274 assertx(m_refCount.load() > 0);
275 if (--m_refCount == 0) {
276 delete this;
280 const timespec& PageletTransport::getStartTimer() const {
281 return m_queueTime;
284 int PageletTransport::getTimeoutSeconds() const {
285 return m_timeoutSeconds;
288 void PageletTransport::setAsioEvent(PageletServerTaskEvent *event) {
289 m_event = event;
292 ///////////////////////////////////////////////////////////////////////////////
294 static int64_t to_ms(const timespec& ts) {
295 return ts.tv_sec * 1000 + (ts.tv_nsec / 1000000);
298 struct PageletWorker
299 : JobQueueWorker<PageletTransport*,Server*,true,false,JobQueueDropVMStack>
301 void doJob(PageletTransport *job) override {
302 try {
303 job->onRequestStart(job->getStartTimer());
304 int timeout = job->getTimeoutSeconds();
305 if (timeout > 0) {
306 timespec ts;
307 Timer::GetMonotonicTime(ts);
308 int64_t delta_ms =
309 to_ms(job->getStartTimer()) + timeout * 1000 - to_ms(ts);
310 if (delta_ms > 500) {
311 timeout = (delta_ms + 500) / 1000;
312 } else {
313 timeout = 1;
315 } else {
316 timeout = 0;
318 HttpRequestHandler(timeout).run(job);
319 job->decRefCount();
320 } catch (...) {
321 Logger::Error("HttpRequestHandler leaked exceptions");
326 ///////////////////////////////////////////////////////////////////////////////
328 struct PageletTask : SweepableResourceData {
329 DECLARE_RESOURCE_ALLOCATION(PageletTask)
331 PageletTask(const String& url, const Array& headers, const String& post_data,
332 const String& remote_host,
333 const std::set<std::string> &rfc1867UploadedFiles,
334 const Array& files, int timeoutSeconds) {
335 m_job = new PageletTransport(url, headers, remote_host, post_data,
336 rfc1867UploadedFiles, files, timeoutSeconds);
337 m_job->incRefCount();
340 ~PageletTask() override {
341 m_job->decRefCount();
344 PageletTransport *getJob() { return m_job;}
346 CLASSNAME_IS("PageletTask");
347 // overriding ResourceData
348 const String& o_getClassNameHook() const override { return classnameof(); }
350 private:
351 PageletTransport *m_job;
353 IMPLEMENT_RESOURCE_ALLOCATION(PageletTask)
355 ///////////////////////////////////////////////////////////////////////////////
356 // implementing PageletServer
358 static JobQueueDispatcher<PageletWorker> *s_dispatcher;
359 static Mutex s_dispatchMutex;
360 static ServiceData::CounterCallback s_counters(
361 [](std::map<std::string, int64_t>& counters) {
362 counters["pagelet_inflight_requests"] = PageletServer::GetActiveWorker();
363 counters["pagelet_queued_requests"] = PageletServer::GetQueuedJobs();
367 bool PageletServer::Enabled() {
368 return s_dispatcher;
371 void PageletServer::Restart() {
372 Stop();
373 if (RuntimeOption::PageletServerThreadCount > 0) {
375 Lock l(s_dispatchMutex);
376 s_dispatcher = new JobQueueDispatcher<PageletWorker>
377 (RuntimeOption::PageletServerThreadCount,
378 RuntimeOption::PageletServerThreadCount,
379 RuntimeOption::PageletServerThreadDropCacheTimeoutSeconds,
380 RuntimeOption::PageletServerThreadDropStack,
381 nullptr);
382 s_dispatcher->setHugePageConfig(
383 RuntimeOption::PageletServerHugeThreadCount,
384 RuntimeOption::ServerHugeStackKb);
385 auto monitor = getSingleton<HostHealthMonitor>();
386 monitor->subscribe(s_dispatcher);
388 s_dispatcher->start();
389 BootStats::mark("pagelet server started");
393 void PageletServer::Stop() {
394 if (s_dispatcher) {
395 auto monitor = getSingleton<HostHealthMonitor>();
396 monitor->unsubscribe(s_dispatcher);
397 s_dispatcher->stop();
398 Lock l(s_dispatchMutex);
399 delete s_dispatcher;
400 s_dispatcher = nullptr;
404 Resource PageletServer::TaskStart(
405 const String& url, const Array& headers,
406 const String& remote_host,
407 const String& post_data /* = null_string */,
408 const Array& files /* = null_array */,
409 int timeoutSeconds /* = -1 */,
410 PageletServerTaskEvent *event /* = nullptr*/
412 static auto pageletOverflowCounter =
413 ServiceData::createTimeSeries("pagelet_overflow",
414 { ServiceData::StatsType::COUNT });
416 Lock l(s_dispatchMutex);
417 if (!s_dispatcher) {
418 return Resource();
420 if (RuntimeOption::PageletServerQueueLimit > 0 &&
421 s_dispatcher->getQueuedJobs() >
422 RuntimeOption::PageletServerQueueLimit) {
423 pageletOverflowCounter->addValue(1);
424 return Resource();
427 auto task = req::make<PageletTask>(url, headers, remote_host, post_data,
428 get_uploaded_files(), files,
429 timeoutSeconds);
430 PageletTransport *job = task->getJob();
431 Lock l(s_dispatchMutex);
432 if (s_dispatcher) {
433 job->incRefCount(); // paired with worker's decRefCount()
435 if (event) {
436 job->setAsioEvent(event);
437 event->setJob(job);
440 s_dispatcher->enqueue(job);
441 g_context->incrPageletTasksStarted();
442 return Resource(std::move(task));
444 return Resource();
447 int64_t PageletServer::TaskStatus(const Resource& task) {
448 PageletTransport *job = cast<PageletTask>(task)->getJob();
449 if (!job->isPipelineEmpty()) {
450 return PAGELET_READY;
452 if (job->isDone()) {
453 return PAGELET_DONE;
455 return PAGELET_NOT_READY;
458 String PageletServer::TaskResult(const Resource& task, Array &headers, int &code,
459 int64_t timeout_ms) {
460 auto ptask = cast<PageletTask>(task);
461 return ptask->getJob()->getResults(headers, code, timeout_ms);
464 Array PageletServer::AsyncTaskResult(const Resource& task) {
465 auto ptask = cast<PageletTask>(task);
466 return ptask->getJob()->getAsyncResults(true);
469 void PageletServer::AddToPipeline(const std::string &s) {
470 assertx(!s.empty());
471 PageletTransport *job =
472 dynamic_cast<PageletTransport *>(g_context->getTransport());
473 assertx(job);
474 job->addToPipeline(s);
477 int PageletServer::GetActiveWorker() {
478 Lock l(s_dispatchMutex);
479 return s_dispatcher ? s_dispatcher->getActiveWorker() : 0;
482 int PageletServer::GetQueuedJobs() {
483 Lock l(s_dispatchMutex);
484 return s_dispatcher ? s_dispatcher->getQueuedJobs() : 0;
487 ///////////////////////////////////////////////////////////////////////////////