Use iterator adapters in decl_class_parents
[hiphop-php.git] / hphp / util / coro.cpp
blob3d8a64844726eec0a4064de09e8fceb41f245754
1 /*
2 +----------------------------------------------------------------------+
3 | HipHop for PHP |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #include "hphp/util/coro.h"
19 #include <folly/executors/CPUThreadPoolExecutor.h>
20 #include <folly/executors/task_queue/BlockingQueue.h>
21 #include <folly/executors/thread_factory/NamedThreadFactory.h>
22 #include <folly/synchronization/LifoSem.h>
24 #include <mutex>
25 #include <queue>
27 namespace HPHP::coro {
29 //////////////////////////////////////////////////////////////////////
31 using namespace folly;
33 //////////////////////////////////////////////////////////////////////
35 namespace {
37 //////////////////////////////////////////////////////////////////////
39 struct ThreadFactory : NamedThreadFactory {
40 ThreadFactory(const std::string& prefix,
41 std::function<void()> init,
42 std::function<void()> fini)
43 : NamedThreadFactory(prefix)
44 , m_init{std::move(init)}
45 , m_fini{std::move(fini)}
48 std::thread newThread(Func&& func) override {
49 return NamedThreadFactory::newThread(
50 [this, func = std::move(func)] () mutable {
51 m_init();
52 SCOPE_EXIT { m_fini(); };
53 func();
58 std::function<void()> m_init;
59 std::function<void()> m_fini;
62 //////////////////////////////////////////////////////////////////////
64 // The heart of the TicketExecutor. A queue which prioritizes
65 // according to Ticket.
66 struct TicketQueue : BlockingQueue<CPUThreadPoolExecutor::CPUTask> {
67 using T = CPUThreadPoolExecutor::CPUTask;
68 using Ticket = TicketExecutor::Ticket;
70 // Interface specific to TicketQueue:
72 BlockingQueueAddResult add(T item, Ticket ticket) {
74 std::lock_guard<std::mutex> _{m_lock};
75 m_queue.emplace(std::move(item), ticket);
77 return m_sem.post();
80 Ticket stamp() { return m_next++; }
82 // Implement interface that CPUThreadPoolExecutor expects:
84 BlockingQueueAddResult add(T item) override {
85 return add(std::move(item), stamp());
88 T take() override {
89 m_sem.wait();
90 return get();
93 folly::Optional<T> try_take_for(std::chrono::milliseconds time) override {
94 if (!m_sem.try_wait_for(time)) return folly::none;
95 return get();
98 size_t size() override {
99 std::lock_guard<std::mutex> _{m_lock};
100 return m_queue.size();
102 private:
103 T get() {
104 std::lock_guard<std::mutex> _{m_lock};
105 assertx(!m_queue.empty());
106 // This is only safe because m_item does not participate in the
107 // ordering priority_queue uses.
108 auto item = std::move(m_queue.top().m_item);
109 m_queue.pop();
110 return item;
113 struct Pair {
114 Pair(T item, Ticket ticket)
115 : m_item{std::move(item)}, m_ticket{ticket} {}
116 mutable T m_item;
117 Ticket m_ticket;
118 bool operator<(const Pair& o) const {
119 // Note the ordering here. We want *lower* tickets to come
120 // first.
121 return o.m_ticket < m_ticket;
125 std::atomic<Ticket> m_next{0};
127 // Just a priority queue protected by a lock. Performance
128 // measurements shows this isn't anywhere near a bottleneck and more
129 // lock-free data structures are complex.
130 std::priority_queue<Pair> m_queue;
131 LifoSem m_sem;
132 std::mutex m_lock;
135 //////////////////////////////////////////////////////////////////////
137 // Executor which keeps a reference to its parent TicketExecutor and a
138 // Ticket. Every call to add() just delegates to the parent, but calls
139 // addWithTicket() instead.
140 struct StickyTicketExecutor : Executor {
141 ~StickyTicketExecutor() override = default;
143 static Executor::KeepAlive<StickyTicketExecutor>
144 create(Executor::KeepAlive<TicketExecutor> e,
145 TicketExecutor::Ticket t) {
146 return makeKeepAlive(new StickyTicketExecutor(std::move(e), t));
149 StickyTicketExecutor(const StickyTicketExecutor&) = delete;
150 StickyTicketExecutor& operator=(const StickyTicketExecutor&) = delete;
151 StickyTicketExecutor(StickyTicketExecutor&&) = delete;
152 StickyTicketExecutor& operator=(StickyTicketExecutor&&) = delete;
154 void add(Func f) override {
155 m_parent->addWithTicket(std::move(f), m_ticket);
158 protected:
159 // KeepAlive interface:
160 bool keepAliveAcquire() noexcept override {
161 auto const DEBUG_ONLY count =
162 m_counter.fetch_add(1, std::memory_order_relaxed);
163 assertx(count > 0);
164 return true;
167 void keepAliveRelease() noexcept override {
168 auto const count = m_counter.fetch_sub(1, std::memory_order_acq_rel);
169 assertx(count > 0);
170 if (count == 1) delete this;
173 private:
174 StickyTicketExecutor(Executor::KeepAlive<TicketExecutor> parent,
175 TicketExecutor::Ticket ticket)
176 : m_parent{std::move(parent)}
177 , m_ticket{ticket} {}
179 std::atomic<ssize_t> m_counter{1};
180 Executor::KeepAlive<TicketExecutor> m_parent;
181 TicketExecutor::Ticket m_ticket;
184 //////////////////////////////////////////////////////////////////////
188 //////////////////////////////////////////////////////////////////////
190 TicketExecutor::TicketExecutor(const std::string& threadPrefix,
191 size_t minThreads,
192 size_t maxThreads,
193 std::function<void()> threadInit,
194 std::function<void()> threadFini,
195 std::chrono::milliseconds threadTimeout)
196 : CPUThreadPoolExecutor{
197 std::make_pair(maxThreads, minThreads),
198 std::make_unique<TicketQueue>(),
199 std::make_shared<ThreadFactory>(
200 threadPrefix, std::move(threadInit), std::move(threadFini)
204 setThreadDeathTimeout(threadTimeout);
207 TicketExecutor::~TicketExecutor() {
210 void TicketExecutor::addWithTicket(Func func, Ticket ticket) {
211 CPUTask task{std::move(func), std::chrono::milliseconds{0}, nullptr, 0};
213 // See CPUThreadPoolExecutor.cpp why this is needed
214 auto const mayNeedToAddThreads =
215 minThreads_.load(std::memory_order_relaxed) == 0 ||
216 (activeThreads_.load(std::memory_order_relaxed) <
217 maxThreads_.load(std::memory_order_relaxed));
219 Executor::KeepAlive<> ka = mayNeedToAddThreads
220 ? getKeepAliveToken(this)
221 : folly::Executor::KeepAlive<>{};
223 auto queue = static_cast<TicketQueue*>(getTaskQueue());
224 auto const result = queue->add(std::move(task), ticket);
225 if (mayNeedToAddThreads && !result.reusedThread) ensureActiveThreads();
228 TicketExecutor::Ticket TicketExecutor::stamp() {
229 return static_cast<TicketQueue*>(getTaskQueue())->stamp();
232 folly::Executor::KeepAlive<> TicketExecutor::sticky(Ticket ticket) {
233 return StickyTicketExecutor::create(this, ticket);
236 //////////////////////////////////////////////////////////////////////