1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/worker_pool_linux.h"
9 #include "base/condition_variable.h"
10 #include "base/lock.h"
11 #include "base/platform_thread.h"
12 #include "base/task.h"
13 #include "base/waitable_event.h"
14 #include "testing/gtest/include/gtest/gtest.h"
18 // Peer class to provide passthrough access to LinuxDynamicThreadPool internals.
19 class LinuxDynamicThreadPool::LinuxDynamicThreadPoolPeer
{
21 explicit LinuxDynamicThreadPoolPeer(LinuxDynamicThreadPool
* pool
)
24 Lock
* lock() { return &pool_
->lock_
; }
25 ConditionVariable
* tasks_available_cv() {
26 return &pool_
->tasks_available_cv_
;
28 const std::queue
<Task
*>& tasks() const { return pool_
->tasks_
; }
29 int num_idle_threads() const { return pool_
->num_idle_threads_
; }
30 ConditionVariable
* num_idle_threads_cv() {
31 return pool_
->num_idle_threads_cv_
.get();
33 void set_num_idle_threads_cv(ConditionVariable
* cv
) {
34 pool_
->num_idle_threads_cv_
.reset(cv
);
38 LinuxDynamicThreadPool
* pool_
;
40 DISALLOW_COPY_AND_ASSIGN(LinuxDynamicThreadPoolPeer
);
47 // IncrementingTask's main purpose is to increment a counter. It also updates a
48 // set of unique thread ids, and signals a ConditionVariable on completion.
49 // Note that since it does not block, there is no way to control the number of
50 // threads used if more than one IncrementingTask is consecutively posted to the
51 // thread pool, since the first one might finish executing before the subsequent
52 // PostTask() calls get invoked.
53 class IncrementingTask
: public Task
{
55 IncrementingTask(Lock
* counter_lock
,
57 Lock
* unique_threads_lock
,
58 std::set
<PlatformThreadId
>* unique_threads
)
59 : counter_lock_(counter_lock
),
60 unique_threads_lock_(unique_threads_lock
),
61 unique_threads_(unique_threads
),
65 AddSelfToUniqueThreadSet();
66 AutoLock
locked(*counter_lock_
);
70 void AddSelfToUniqueThreadSet() {
71 AutoLock
locked(*unique_threads_lock_
);
72 unique_threads_
->insert(PlatformThread::CurrentId());
77 Lock
* unique_threads_lock_
;
78 std::set
<PlatformThreadId
>* unique_threads_
;
81 DISALLOW_COPY_AND_ASSIGN(IncrementingTask
);
84 // BlockingIncrementingTask is a simple wrapper around IncrementingTask that
85 // allows for waiting at the start of Run() for a WaitableEvent to be signalled.
86 class BlockingIncrementingTask
: public Task
{
88 BlockingIncrementingTask(Lock
* counter_lock
,
90 Lock
* unique_threads_lock
,
91 std::set
<PlatformThreadId
>* unique_threads
,
92 Lock
* num_waiting_to_start_lock
,
93 int* num_waiting_to_start
,
94 ConditionVariable
* num_waiting_to_start_cv
,
95 base::WaitableEvent
* start
)
97 counter_lock
, counter
, unique_threads_lock
, unique_threads
),
98 num_waiting_to_start_lock_(num_waiting_to_start_lock
),
99 num_waiting_to_start_(num_waiting_to_start
),
100 num_waiting_to_start_cv_(num_waiting_to_start_cv
),
105 AutoLock
num_waiting_to_start_locked(*num_waiting_to_start_lock_
);
106 (*num_waiting_to_start_
)++;
108 num_waiting_to_start_cv_
->Signal();
109 CHECK(start_
->Wait());
114 IncrementingTask incrementer_
;
115 Lock
* num_waiting_to_start_lock_
;
116 int* num_waiting_to_start_
;
117 ConditionVariable
* num_waiting_to_start_cv_
;
118 base::WaitableEvent
* start_
;
120 DISALLOW_COPY_AND_ASSIGN(BlockingIncrementingTask
);
123 class LinuxDynamicThreadPoolTest
: public testing::Test
{
125 LinuxDynamicThreadPoolTest()
126 : pool_(new base::LinuxDynamicThreadPool("dynamic_pool", 60*60)),
129 num_waiting_to_start_(0),
130 num_waiting_to_start_cv_(&num_waiting_to_start_lock_
),
131 start_(true, false) {}
133 virtual void SetUp() {
134 peer_
.set_num_idle_threads_cv(new ConditionVariable(peer_
.lock()));
137 virtual void TearDown() {
138 // Wake up the idle threads so they can terminate.
139 if (pool_
.get()) pool_
->Terminate();
142 void WaitForTasksToStart(int num_tasks
) {
143 AutoLock
num_waiting_to_start_locked(num_waiting_to_start_lock_
);
144 while (num_waiting_to_start_
< num_tasks
) {
145 num_waiting_to_start_cv_
.Wait();
149 void WaitForIdleThreads(int num_idle_threads
) {
150 AutoLock
pool_locked(*peer_
.lock());
151 while (peer_
.num_idle_threads() < num_idle_threads
) {
152 peer_
.num_idle_threads_cv()->Wait();
156 Task
* CreateNewIncrementingTask() {
157 return new IncrementingTask(&counter_lock_
, &counter_
,
158 &unique_threads_lock_
, &unique_threads_
);
161 Task
* CreateNewBlockingIncrementingTask() {
162 return new BlockingIncrementingTask(
163 &counter_lock_
, &counter_
, &unique_threads_lock_
, &unique_threads_
,
164 &num_waiting_to_start_lock_
, &num_waiting_to_start_
,
165 &num_waiting_to_start_cv_
, &start_
);
168 scoped_refptr
<base::LinuxDynamicThreadPool
> pool_
;
169 base::LinuxDynamicThreadPool::LinuxDynamicThreadPoolPeer peer_
;
172 Lock unique_threads_lock_
;
173 std::set
<PlatformThreadId
> unique_threads_
;
174 Lock num_waiting_to_start_lock_
;
175 int num_waiting_to_start_
;
176 ConditionVariable num_waiting_to_start_cv_
;
177 base::WaitableEvent start_
;
180 TEST_F(LinuxDynamicThreadPoolTest
, Basic
) {
181 EXPECT_EQ(0, peer_
.num_idle_threads());
182 EXPECT_EQ(0U, unique_threads_
.size());
183 EXPECT_EQ(0U, peer_
.tasks().size());
185 // Add one task and wait for it to be completed.
186 pool_
->PostTask(CreateNewIncrementingTask());
188 WaitForIdleThreads(1);
190 EXPECT_EQ(1U, unique_threads_
.size()) <<
191 "There should be only one thread allocated for one task.";
192 EXPECT_EQ(1, peer_
.num_idle_threads());
193 EXPECT_EQ(1, counter_
);
196 TEST_F(LinuxDynamicThreadPoolTest
, ReuseIdle
) {
197 // Add one task and wait for it to be completed.
198 pool_
->PostTask(CreateNewIncrementingTask());
200 WaitForIdleThreads(1);
202 // Add another 2 tasks. One should reuse the existing worker thread.
203 pool_
->PostTask(CreateNewBlockingIncrementingTask());
204 pool_
->PostTask(CreateNewBlockingIncrementingTask());
206 WaitForTasksToStart(2);
208 WaitForIdleThreads(2);
210 EXPECT_EQ(2U, unique_threads_
.size());
211 EXPECT_EQ(2, peer_
.num_idle_threads());
212 EXPECT_EQ(3, counter_
);
215 TEST_F(LinuxDynamicThreadPoolTest
, TwoActiveTasks
) {
216 // Add two blocking tasks.
217 pool_
->PostTask(CreateNewBlockingIncrementingTask());
218 pool_
->PostTask(CreateNewBlockingIncrementingTask());
220 EXPECT_EQ(0, counter_
) << "Blocking tasks should not have started yet.";
222 WaitForTasksToStart(2);
224 WaitForIdleThreads(2);
226 EXPECT_EQ(2U, unique_threads_
.size());
227 EXPECT_EQ(2, peer_
.num_idle_threads()) << "Existing threads are now idle.";
228 EXPECT_EQ(2, counter_
);
231 TEST_F(LinuxDynamicThreadPoolTest
, Complex
) {
232 // Add two non blocking tasks and wait for them to finish.
233 pool_
->PostTask(CreateNewIncrementingTask());
235 WaitForIdleThreads(1);
237 // Add two blocking tasks, start them simultaneously, and wait for them to
239 pool_
->PostTask(CreateNewBlockingIncrementingTask());
240 pool_
->PostTask(CreateNewBlockingIncrementingTask());
242 WaitForTasksToStart(2);
244 WaitForIdleThreads(2);
246 EXPECT_EQ(3, counter_
);
247 EXPECT_EQ(2, peer_
.num_idle_threads());
248 EXPECT_EQ(2U, unique_threads_
.size());
250 // Wake up all idle threads so they can exit.
252 AutoLock
locked(*peer_
.lock());
253 while (peer_
.num_idle_threads() > 0) {
254 peer_
.tasks_available_cv()->Signal();
255 peer_
.num_idle_threads_cv()->Wait();
259 // Add another non blocking task. There are no threads to reuse.
260 pool_
->PostTask(CreateNewIncrementingTask());
261 WaitForIdleThreads(1);
263 EXPECT_EQ(3U, unique_threads_
.size());
264 EXPECT_EQ(1, peer_
.num_idle_threads());
265 EXPECT_EQ(4, counter_
);