Refactor back ends into implementations of BackEnd.
[hiphop-php.git] / hphp / runtime / vm / treadmill.cpp
blob30aa275fc0951f8a9442f7eed8b5a7b21e59c510
1 /*
2 +----------------------------------------------------------------------+
3 | HipHop for PHP |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-2014 Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #include "hphp/runtime/vm/treadmill.h"
19 #include <list>
20 #include <atomic>
21 #include <vector>
22 #include <memory>
23 #include <algorithm>
25 #include <sys/time.h>
27 #include <stdlib.h>
28 #include <pthread.h>
29 #include <stdio.h>
31 #include "hphp/util/trace.h"
32 #include "hphp/util/rank.h"
33 #include "hphp/runtime/base/macros.h"
34 #include "hphp/runtime/vm/jit/mc-generator.h"
36 namespace HPHP { namespace Treadmill {
38 TRACE_SET_MOD(treadmill);
40 namespace {
42 //////////////////////////////////////////////////////////////////////
44 const int64_t ONE_SEC_IN_MICROSEC = 1000000;
46 pthread_mutex_t s_genLock = PTHREAD_MUTEX_INITIALIZER;
47 const GenCount kIdleGenCount = 0; // not processing any requests.
48 std::vector<GenCount> s_inflightRequests;
49 GenCount s_latestCount = 0;
50 std::atomic<GenCount> s_oldestRequestInFlight(0);
53 * We assign local, unique indexes to each thread, with hopes that
54 * they are densely packed.
56 * The plan here is that each thread starts with s_thisThreadIdx as
57 * -1. And the first time a thread starts using the Treadmill it
58 * allocates a new thread id from s_nextThreadIdx with fetch_add.
60 std::atomic<int64_t> s_nextThreadIdx{0};
61 __thread int64_t s_thisThreadIdx{-1};
63 //////////////////////////////////////////////////////////////////////
66 * The next 2 functions should be used to manage the generation count/time
67 * in the treadmill for both the requests and the work items.
68 * The pattern is to call getTime() outside of the lock and correctTime()
69 * while holding the lock.
70 * That pattern guarantees a monotonically increasing counter.
71 * The resolution being microseconds should give us all the room we need
72 * to accommodate requests and work items at any conceivable rate and
73 * correctTime() should give us correct behavior at any granularity of
74 * gettimeofday().
78 * Return the current time in microseconds.
79 * Usually called outside of the lock.
81 GenCount getTime() {
82 struct timeval time;
83 gettimeofday(&time, nullptr);
84 return time.tv_sec * ONE_SEC_IN_MICROSEC + time.tv_usec;
88 * Return a monotonically increasing time given the last time recorded.
89 * This must be called while holding the lock.
91 GenCount correctTime(GenCount time) {
92 s_latestCount = time <= s_latestCount ? s_latestCount + 1 : time;
93 return s_latestCount;
96 struct GenCountGuard {
97 GenCountGuard() {
98 checkRank(RankTreadmill);
99 pthread_mutex_lock(&s_genLock);
100 pushRank(RankTreadmill);
102 ~GenCountGuard() {
103 popRank(RankTreadmill);
104 pthread_mutex_unlock(&s_genLock);
108 //////////////////////////////////////////////////////////////////////
112 typedef std::list<std::unique_ptr<WorkItem>> PendingTriggers;
113 static PendingTriggers s_tq;
115 void enqueueInternal(std::unique_ptr<WorkItem> gt) {
116 GenCount time = getTime();
118 GenCountGuard g;
119 gt->m_gen = correctTime(time);
120 s_tq.emplace_back(std::move(gt));
124 void startRequest() {
125 if (UNLIKELY(s_thisThreadIdx == -1)) {
126 s_thisThreadIdx = s_nextThreadIdx.fetch_add(1);
128 auto const threadId = s_thisThreadIdx;
130 GenCount startTime = getTime();
132 GenCountGuard g;
133 if (threadId >= s_inflightRequests.size()) {
134 s_inflightRequests.resize(threadId + 1, kIdleGenCount);
135 } else {
136 assert(s_inflightRequests[threadId] == kIdleGenCount);
138 s_inflightRequests[threadId] = correctTime(startTime);
139 FTRACE(1, "tid {} start @gen {}\n", threadId,
140 s_inflightRequests[threadId]);
141 if (s_oldestRequestInFlight.load(std::memory_order_relaxed) == 0) {
142 s_oldestRequestInFlight = s_inflightRequests[threadId];
147 void finishRequest() {
148 auto const threadId = s_thisThreadIdx;
149 assert(threadId != -1);
150 FTRACE(1, "tid {} finish\n", threadId);
151 std::vector<std::unique_ptr<WorkItem>> toFire;
153 GenCountGuard g;
154 assert(s_inflightRequests[threadId] != kIdleGenCount);
155 GenCount finishedRequest = s_inflightRequests[threadId];
156 s_inflightRequests[threadId] = kIdleGenCount;
158 // After finishing a request, check to see if we've allowed any triggers
159 // to fire and update the time of the oldest request in flight.
160 // However if the request just finished is not the current oldest we
161 // don't need to check anything as there cannot be any WorkItem to run.
162 if (s_oldestRequestInFlight.load(std::memory_order_relaxed) ==
163 finishedRequest) {
164 GenCount limit = s_latestCount + 1;
165 for (auto val : s_inflightRequests) {
166 if (val != kIdleGenCount && val < limit) {
167 limit = val;
170 // update "oldest in flight" or kill it if there are no running requests
171 s_oldestRequestInFlight = limit == s_latestCount + 1 ? 0 : limit;
173 // collect WorkItem to run
174 auto it = s_tq.begin();
175 auto end = s_tq.end();
176 while (it != end) {
177 TRACE(2, "considering delendum %d\n", int((*it)->m_gen));
178 if ((*it)->m_gen >= limit) {
179 TRACE(2, "not unreachable! %d\n", int((*it)->m_gen));
180 break;
182 toFire.emplace_back(std::move(*it));
183 it = s_tq.erase(it);
187 for (unsigned i = 0; i < toFire.size(); ++i) {
188 toFire[i]->run();
192 int64_t getOldestStartTime() {
193 int64_t time = s_oldestRequestInFlight.load(std::memory_order_relaxed);
194 return time / ONE_SEC_IN_MICROSEC + 1; // round up 1 sec
197 void deferredFree(void* p) {
198 enqueue([p] { free(p); });