2 +----------------------------------------------------------------------+
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #include "hphp/util/coro.h"
19 #include <folly/executors/CPUThreadPoolExecutor.h>
20 #include <folly/executors/task_queue/BlockingQueue.h>
21 #include <folly/executors/thread_factory/NamedThreadFactory.h>
22 #include <folly/synchronization/LifoSem.h>
27 namespace HPHP::coro
{
29 //////////////////////////////////////////////////////////////////////
31 using namespace folly
;
33 //////////////////////////////////////////////////////////////////////
37 //////////////////////////////////////////////////////////////////////
39 struct ThreadFactory
: NamedThreadFactory
{
40 ThreadFactory(const std::string
& prefix
,
41 std::function
<void()> init
,
42 std::function
<void()> fini
)
43 : NamedThreadFactory(prefix
)
44 , m_init
{std::move(init
)}
45 , m_fini
{std::move(fini
)}
48 std::thread
newThread(Func
&& func
) override
{
49 return NamedThreadFactory::newThread(
50 [this, func
= std::move(func
)] () mutable {
52 SCOPE_EXIT
{ m_fini(); };
58 std::function
<void()> m_init
;
59 std::function
<void()> m_fini
;
62 //////////////////////////////////////////////////////////////////////
64 // The heart of the TicketExecutor. A queue which prioritizes
65 // according to Ticket.
66 struct TicketQueue
: BlockingQueue
<CPUThreadPoolExecutor::CPUTask
> {
67 using T
= CPUThreadPoolExecutor::CPUTask
;
68 using Ticket
= TicketExecutor::Ticket
;
70 // Interface specific to TicketQueue:
72 BlockingQueueAddResult
add(T item
, Ticket ticket
) {
74 std::lock_guard
<std::mutex
> _
{m_lock
};
75 m_queue
.emplace(std::move(item
), ticket
);
80 Ticket
stamp() { return m_next
++; }
82 // Implement interface that CPUThreadPoolExecutor expects:
84 BlockingQueueAddResult
add(T item
) override
{
85 return add(std::move(item
), stamp());
93 folly::Optional
<T
> try_take_for(std::chrono::milliseconds time
) override
{
94 if (!m_sem
.try_wait_for(time
)) return folly::none
;
98 size_t size() override
{
99 std::lock_guard
<std::mutex
> _
{m_lock
};
100 return m_queue
.size();
104 std::lock_guard
<std::mutex
> _
{m_lock
};
105 assertx(!m_queue
.empty());
106 // This is only safe because m_item does not participate in the
107 // ordering priority_queue uses.
108 auto item
= std::move(m_queue
.top().m_item
);
114 Pair(T item
, Ticket ticket
)
115 : m_item
{std::move(item
)}, m_ticket
{ticket
} {}
118 bool operator<(const Pair
& o
) const {
119 // Note the ordering here. We want *lower* tickets to come
121 return o
.m_ticket
< m_ticket
;
125 std::atomic
<Ticket
> m_next
{0};
127 // Just a priority queue protected by a lock. Performance
128 // measurements shows this isn't anywhere near a bottleneck and more
129 // lock-free data structures are complex.
130 std::priority_queue
<Pair
> m_queue
;
135 //////////////////////////////////////////////////////////////////////
137 // Executor which keeps a reference to its parent TicketExecutor and a
138 // Ticket. Every call to add() just delegates to the parent, but calls
139 // addWithTicket() instead.
140 struct StickyTicketExecutor
: Executor
{
141 ~StickyTicketExecutor() override
= default;
143 static Executor::KeepAlive
<StickyTicketExecutor
>
144 create(Executor::KeepAlive
<TicketExecutor
> e
,
145 TicketExecutor::Ticket t
) {
146 return makeKeepAlive(new StickyTicketExecutor(std::move(e
), t
));
149 StickyTicketExecutor(const StickyTicketExecutor
&) = delete;
150 StickyTicketExecutor
& operator=(const StickyTicketExecutor
&) = delete;
151 StickyTicketExecutor(StickyTicketExecutor
&&) = delete;
152 StickyTicketExecutor
& operator=(StickyTicketExecutor
&&) = delete;
154 void add(Func f
) override
{
155 m_parent
->addWithTicket(std::move(f
), m_ticket
);
159 // KeepAlive interface:
160 bool keepAliveAcquire() noexcept override
{
161 auto const DEBUG_ONLY count
=
162 m_counter
.fetch_add(1, std::memory_order_relaxed
);
167 void keepAliveRelease() noexcept override
{
168 auto const count
= m_counter
.fetch_sub(1, std::memory_order_acq_rel
);
170 if (count
== 1) delete this;
174 StickyTicketExecutor(Executor::KeepAlive
<TicketExecutor
> parent
,
175 TicketExecutor::Ticket ticket
)
176 : m_parent
{std::move(parent
)}
177 , m_ticket
{ticket
} {}
179 std::atomic
<ssize_t
> m_counter
{1};
180 Executor::KeepAlive
<TicketExecutor
> m_parent
;
181 TicketExecutor::Ticket m_ticket
;
184 //////////////////////////////////////////////////////////////////////
188 //////////////////////////////////////////////////////////////////////
190 TicketExecutor::TicketExecutor(const std::string
& threadPrefix
,
193 std::function
<void()> threadInit
,
194 std::function
<void()> threadFini
,
195 std::chrono::milliseconds threadTimeout
)
196 : CPUThreadPoolExecutor
{
197 std::make_pair(maxThreads
, minThreads
),
198 std::make_unique
<TicketQueue
>(),
199 std::make_shared
<ThreadFactory
>(
200 threadPrefix
, std::move(threadInit
), std::move(threadFini
)
204 setThreadDeathTimeout(threadTimeout
);
207 TicketExecutor::~TicketExecutor() {
210 void TicketExecutor::addWithTicket(Func func
, Ticket ticket
) {
211 CPUTask task
{std::move(func
), std::chrono::milliseconds
{0}, nullptr, 0};
213 // See CPUThreadPoolExecutor.cpp why this is needed
214 auto const mayNeedToAddThreads
=
215 minThreads_
.load(std::memory_order_relaxed
) == 0 ||
216 (activeThreads_
.load(std::memory_order_relaxed
) <
217 maxThreads_
.load(std::memory_order_relaxed
));
219 Executor::KeepAlive
<> ka
= mayNeedToAddThreads
220 ? getKeepAliveToken(this)
221 : folly::Executor::KeepAlive
<>{};
223 auto queue
= static_cast<TicketQueue
*>(getTaskQueue());
224 auto const result
= queue
->add(std::move(task
), ticket
);
225 if (mayNeedToAddThreads
&& !result
.reusedThread
) ensureActiveThreads();
228 TicketExecutor::Ticket
TicketExecutor::stamp() {
229 return static_cast<TicketQueue
*>(getTaskQueue())->stamp();
232 folly::Executor::KeepAlive
<> TicketExecutor::sticky(Ticket ticket
) {
233 return StickyTicketExecutor::create(this, ticket
);
236 //////////////////////////////////////////////////////////////////////