Added llvm into exceptions as we can't add README.chromium into 3rd party repository
[chromium-blink-merge.git] / base / threading / sequenced_worker_pool_unittest.cc
blobed5f89694983ca707b977990a59982ca5a9fb83b
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/threading/sequenced_worker_pool.h"
7 #include <algorithm>
9 #include "base/bind.h"
10 #include "base/compiler_specific.h"
11 #include "base/memory/ref_counted.h"
12 #include "base/memory/scoped_ptr.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/message_loop/message_loop_proxy.h"
15 #include "base/synchronization/condition_variable.h"
16 #include "base/synchronization/lock.h"
17 #include "base/test/sequenced_task_runner_test_template.h"
18 #include "base/test/sequenced_worker_pool_owner.h"
19 #include "base/test/task_runner_test_template.h"
20 #include "base/test/test_timeouts.h"
21 #include "base/threading/platform_thread.h"
22 #include "base/time/time.h"
23 #include "base/tracked_objects.h"
24 #include "testing/gtest/include/gtest/gtest.h"
26 namespace base {
28 // IMPORTANT NOTE:
30 // Many of these tests have failure modes where they'll hang forever. These
31 // tests should not be flaky, and hanging indicates a type of failure. Do not
32 // mark as flaky if they're hanging, it's likely an actual bug.
34 namespace {
36 const size_t kNumWorkerThreads = 3;
38 // Allows a number of threads to all be blocked on the same event, and
39 // provides a way to unblock a certain number of them.
40 class ThreadBlocker {
41 public:
42 ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {}
44 void Block() {
46 base::AutoLock lock(lock_);
47 while (unblock_counter_ == 0)
48 cond_var_.Wait();
49 unblock_counter_--;
51 cond_var_.Signal();
54 void Unblock(size_t count) {
56 base::AutoLock lock(lock_);
57 DCHECK(unblock_counter_ == 0);
58 unblock_counter_ = count;
60 cond_var_.Signal();
63 private:
64 base::Lock lock_;
65 base::ConditionVariable cond_var_;
67 size_t unblock_counter_;
70 class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
71 public:
72 TestTracker()
73 : lock_(),
74 cond_var_(&lock_),
75 started_events_(0) {
78 // Each of these tasks appends the argument to the complete sequence vector
79 // so calling code can see what order they finished in.
80 void FastTask(int id) {
81 SignalWorkerDone(id);
84 void SlowTask(int id) {
85 base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1));
86 SignalWorkerDone(id);
89 void BlockTask(int id, ThreadBlocker* blocker) {
90 // Note that this task has started and signal anybody waiting for that
91 // to happen.
93 base::AutoLock lock(lock_);
94 started_events_++;
96 cond_var_.Signal();
98 blocker->Block();
99 SignalWorkerDone(id);
102 void PostAdditionalTasks(
103 int id, SequencedWorkerPool* pool,
104 bool expected_return_value) {
105 Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100);
106 EXPECT_EQ(expected_return_value,
107 pool->PostWorkerTaskWithShutdownBehavior(
108 FROM_HERE, fast_task,
109 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
110 EXPECT_EQ(expected_return_value,
111 pool->PostWorkerTaskWithShutdownBehavior(
112 FROM_HERE, fast_task,
113 SequencedWorkerPool::SKIP_ON_SHUTDOWN));
114 pool->PostWorkerTaskWithShutdownBehavior(
115 FROM_HERE, fast_task,
116 SequencedWorkerPool::BLOCK_SHUTDOWN);
117 SignalWorkerDone(id);
120 // Waits until the given number of tasks have started executing.
121 void WaitUntilTasksBlocked(size_t count) {
123 base::AutoLock lock(lock_);
124 while (started_events_ < count)
125 cond_var_.Wait();
127 cond_var_.Signal();
130 // Blocks the current thread until at least the given number of tasks are in
131 // the completed vector, and then returns a copy.
132 std::vector<int> WaitUntilTasksComplete(size_t num_tasks) {
133 std::vector<int> ret;
135 base::AutoLock lock(lock_);
136 while (complete_sequence_.size() < num_tasks)
137 cond_var_.Wait();
138 ret = complete_sequence_;
140 cond_var_.Signal();
141 return ret;
144 size_t GetTasksCompletedCount() {
145 base::AutoLock lock(lock_);
146 return complete_sequence_.size();
149 void ClearCompleteSequence() {
150 base::AutoLock lock(lock_);
151 complete_sequence_.clear();
152 started_events_ = 0;
155 private:
156 friend class base::RefCountedThreadSafe<TestTracker>;
157 ~TestTracker() {}
159 void SignalWorkerDone(int id) {
161 base::AutoLock lock(lock_);
162 complete_sequence_.push_back(id);
164 cond_var_.Signal();
167 // Protects the complete_sequence.
168 base::Lock lock_;
170 base::ConditionVariable cond_var_;
172 // Protected by lock_.
173 std::vector<int> complete_sequence_;
175 // Counter of the number of "block" workers that have started.
176 size_t started_events_;
179 class SequencedWorkerPoolTest : public testing::Test {
180 public:
181 SequencedWorkerPoolTest()
182 : tracker_(new TestTracker) {
183 ResetPool();
186 void TearDown() override { pool()->Shutdown(); }
188 const scoped_refptr<SequencedWorkerPool>& pool() {
189 return pool_owner_->pool();
191 TestTracker* tracker() { return tracker_.get(); }
193 // Destroys the SequencedWorkerPool instance, blocking until it is fully shut
194 // down, and creates a new instance.
195 void ResetPool() {
196 pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test"));
199 void SetWillWaitForShutdownCallback(const Closure& callback) {
200 pool_owner_->SetWillWaitForShutdownCallback(callback);
203 // Ensures that the given number of worker threads is created by adding
204 // tasks and waiting until they complete. Worker thread creation is
205 // serialized, can happen on background threads asynchronously, and doesn't
206 // happen any more at shutdown. This means that if a test posts a bunch of
207 // tasks and calls shutdown, fewer workers will be created than the test may
208 // expect.
210 // This function ensures that this condition can't happen so tests can make
211 // assumptions about the number of workers active. See the comment in
212 // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
213 // details.
215 // It will post tasks to the queue with id -1. It also assumes this is the
216 // first thing called in a test since it will clear the complete_sequence_.
217 void EnsureAllWorkersCreated() {
218 // Create a bunch of threads, all waiting. This will cause that may
219 // workers to be created.
220 ThreadBlocker blocker;
221 for (size_t i = 0; i < kNumWorkerThreads; i++) {
222 pool()->PostWorkerTask(FROM_HERE,
223 base::Bind(&TestTracker::BlockTask,
224 tracker(), -1, &blocker));
226 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
228 // Now wake them up and wait until they're done.
229 blocker.Unblock(kNumWorkerThreads);
230 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
232 // Clean up the task IDs we added.
233 tracker()->ClearCompleteSequence();
236 int has_work_call_count() const {
237 return pool_owner_->has_work_call_count();
240 private:
241 MessageLoop message_loop_;
242 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
243 const scoped_refptr<TestTracker> tracker_;
246 // Checks that the given number of entries are in the tasks to complete of
247 // the given tracker, and then signals the given event the given number of
248 // times. This is used to wakt up blocked background threads before blocking
249 // on shutdown.
250 void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker,
251 size_t expected_tasks_to_complete,
252 ThreadBlocker* blocker,
253 size_t threads_to_awake) {
254 EXPECT_EQ(
255 expected_tasks_to_complete,
256 tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size());
258 blocker->Unblock(threads_to_awake);
261 class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> {
262 public:
263 explicit DeletionHelper(
264 const scoped_refptr<base::RefCountedData<bool> >& deleted_flag)
265 : deleted_flag_(deleted_flag) {
268 private:
269 friend class base::RefCountedThreadSafe<DeletionHelper>;
270 virtual ~DeletionHelper() { deleted_flag_->data = true; }
272 const scoped_refptr<base::RefCountedData<bool> > deleted_flag_;
273 DISALLOW_COPY_AND_ASSIGN(DeletionHelper);
276 void HoldPoolReference(const scoped_refptr<base::SequencedWorkerPool>& pool,
277 const scoped_refptr<DeletionHelper>& helper) {
278 ADD_FAILURE() << "Should never run";
281 // Tests that delayed tasks are deleted upon shutdown of the pool.
282 TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) {
283 // Post something to verify the pool is started up.
284 EXPECT_TRUE(pool()->PostTask(
285 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1)));
287 scoped_refptr<base::RefCountedData<bool> > deleted_flag(
288 new base::RefCountedData<bool>(false));
290 base::Time posted_at(base::Time::Now());
291 // Post something that shouldn't run.
292 EXPECT_TRUE(pool()->PostDelayedTask(
293 FROM_HERE,
294 base::Bind(&HoldPoolReference,
295 pool(),
296 make_scoped_refptr(new DeletionHelper(deleted_flag))),
297 TestTimeouts::action_timeout()));
299 std::vector<int> completion_sequence = tracker()->WaitUntilTasksComplete(1);
300 ASSERT_EQ(1u, completion_sequence.size());
301 ASSERT_EQ(1, completion_sequence[0]);
303 pool()->Shutdown();
304 // Shutdown is asynchronous, so use ResetPool() to block until the pool is
305 // fully destroyed (and thus shut down).
306 ResetPool();
308 // Verify that we didn't block until the task was due.
309 ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout());
311 // Verify that the deferred task has not only not run, but has also been
312 // destroyed.
313 ASSERT_TRUE(deleted_flag->data);
316 // Tests that same-named tokens have the same ID.
317 TEST_F(SequencedWorkerPoolTest, NamedTokens) {
318 const std::string name1("hello");
319 SequencedWorkerPool::SequenceToken token1 =
320 pool()->GetNamedSequenceToken(name1);
322 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
324 const std::string name3("goodbye");
325 SequencedWorkerPool::SequenceToken token3 =
326 pool()->GetNamedSequenceToken(name3);
328 // All 3 tokens should be different.
329 EXPECT_FALSE(token1.Equals(token2));
330 EXPECT_FALSE(token1.Equals(token3));
331 EXPECT_FALSE(token2.Equals(token3));
333 // Requesting the same name again should give the same value.
334 SequencedWorkerPool::SequenceToken token1again =
335 pool()->GetNamedSequenceToken(name1);
336 EXPECT_TRUE(token1.Equals(token1again));
338 SequencedWorkerPool::SequenceToken token3again =
339 pool()->GetNamedSequenceToken(name3);
340 EXPECT_TRUE(token3.Equals(token3again));
343 // Tests that posting a bunch of tasks (many more than the number of worker
344 // threads) runs them all.
345 TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
346 pool()->PostWorkerTask(FROM_HERE,
347 base::Bind(&TestTracker::SlowTask, tracker(), 0));
349 const size_t kNumTasks = 20;
350 for (size_t i = 1; i < kNumTasks; i++) {
351 pool()->PostWorkerTask(FROM_HERE,
352 base::Bind(&TestTracker::FastTask, tracker(), i));
355 std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
356 EXPECT_EQ(kNumTasks, result.size());
359 // Tests that posting a bunch of tasks (many more than the number of
360 // worker threads) to two pools simultaneously runs them all twice.
361 // This test is meant to shake out any concurrency issues between
362 // pools (like histograms).
363 TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) {
364 SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1");
365 SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2");
367 base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0);
368 pool1.pool()->PostWorkerTask(FROM_HERE, slow_task);
369 pool2.pool()->PostWorkerTask(FROM_HERE, slow_task);
371 const size_t kNumTasks = 20;
372 for (size_t i = 1; i < kNumTasks; i++) {
373 base::Closure fast_task =
374 base::Bind(&TestTracker::FastTask, tracker(), i);
375 pool1.pool()->PostWorkerTask(FROM_HERE, fast_task);
376 pool2.pool()->PostWorkerTask(FROM_HERE, fast_task);
379 std::vector<int> result =
380 tracker()->WaitUntilTasksComplete(2*kNumTasks);
381 EXPECT_EQ(2 * kNumTasks, result.size());
383 pool2.pool()->Shutdown();
384 pool1.pool()->Shutdown();
387 // Test that tasks with the same sequence token are executed in order but don't
388 // affect other tasks.
389 TEST_F(SequencedWorkerPoolTest, Sequence) {
390 // Fill all the worker threads except one.
391 const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
392 ThreadBlocker background_blocker;
393 for (size_t i = 0; i < kNumBackgroundTasks; i++) {
394 pool()->PostWorkerTask(FROM_HERE,
395 base::Bind(&TestTracker::BlockTask,
396 tracker(), i, &background_blocker));
398 tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks);
400 // Create two tasks with the same sequence token, one that will block on the
401 // event, and one which will just complete quickly when it's run. Since there
402 // is one worker thread free, the first task will start and then block, and
403 // the second task should be waiting.
404 ThreadBlocker blocker;
405 SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
406 pool()->PostSequencedWorkerTask(
407 token1, FROM_HERE,
408 base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker));
409 pool()->PostSequencedWorkerTask(
410 token1, FROM_HERE,
411 base::Bind(&TestTracker::FastTask, tracker(), 101));
412 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
414 // Create another two tasks as above with a different token. These will be
415 // blocked since there are no slots to run.
416 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
417 pool()->PostSequencedWorkerTask(
418 token2, FROM_HERE,
419 base::Bind(&TestTracker::FastTask, tracker(), 200));
420 pool()->PostSequencedWorkerTask(
421 token2, FROM_HERE,
422 base::Bind(&TestTracker::FastTask, tracker(), 201));
423 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
425 // Let one background task complete. This should then let both tasks of
426 // token2 run to completion in order. The second task of token1 should still
427 // be blocked.
428 background_blocker.Unblock(1);
429 std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
430 ASSERT_EQ(3u, result.size());
431 EXPECT_EQ(200, result[1]);
432 EXPECT_EQ(201, result[2]);
434 // Finish the rest of the background tasks. This should leave some workers
435 // free with the second token1 task still blocked on the first.
436 background_blocker.Unblock(kNumBackgroundTasks - 1);
437 EXPECT_EQ(kNumBackgroundTasks + 2,
438 tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size());
440 // Allow the first task of token1 to complete. This should run the second.
441 blocker.Unblock(1);
442 result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4);
443 ASSERT_EQ(kNumBackgroundTasks + 4, result.size());
444 EXPECT_EQ(100, result[result.size() - 2]);
445 EXPECT_EQ(101, result[result.size() - 1]);
448 // Tests that any tasks posted after Shutdown are ignored.
449 // Disabled for flakiness. See http://crbug.com/166451.
450 TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) {
451 // Start tasks to take all the threads and block them.
452 EnsureAllWorkersCreated();
453 ThreadBlocker blocker;
454 for (size_t i = 0; i < kNumWorkerThreads; i++) {
455 pool()->PostWorkerTask(FROM_HERE,
456 base::Bind(&TestTracker::BlockTask,
457 tracker(), i, &blocker));
459 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
461 SetWillWaitForShutdownCallback(
462 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
463 scoped_refptr<TestTracker>(tracker()), 0,
464 &blocker, kNumWorkerThreads));
466 // Shutdown the worker pool. This should discard all non-blocking tasks.
467 const int kMaxNewBlockingTasksAfterShutdown = 100;
468 pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown);
470 int old_has_work_call_count = has_work_call_count();
472 std::vector<int> result =
473 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
475 // The kNumWorkerThread items should have completed, in no particular order.
476 ASSERT_EQ(kNumWorkerThreads, result.size());
477 for (size_t i = 0; i < kNumWorkerThreads; i++) {
478 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
479 result.end());
482 // No further tasks, regardless of shutdown mode, should be allowed.
483 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
484 FROM_HERE,
485 base::Bind(&TestTracker::FastTask, tracker(), 100),
486 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
487 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
488 FROM_HERE,
489 base::Bind(&TestTracker::FastTask, tracker(), 101),
490 SequencedWorkerPool::SKIP_ON_SHUTDOWN));
491 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
492 FROM_HERE,
493 base::Bind(&TestTracker::FastTask, tracker(), 102),
494 SequencedWorkerPool::BLOCK_SHUTDOWN));
496 ASSERT_EQ(old_has_work_call_count, has_work_call_count());
499 TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) {
500 // Test that <n> new blocking tasks are allowed provided they're posted
501 // by a running tasks.
502 EnsureAllWorkersCreated();
503 ThreadBlocker blocker;
505 // Start tasks to take all the threads and block them.
506 const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
507 for (int i = 0; i < kNumBlockTasks; ++i) {
508 EXPECT_TRUE(pool()->PostWorkerTask(
509 FROM_HERE,
510 base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
512 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
514 // Queue up shutdown blocking tasks behind those which will attempt to post
515 // additional tasks when run, PostAdditionalTasks attemtps to post 3
516 // new FastTasks, one for each shutdown_behavior.
517 const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads);
518 for (int i = 0; i < kNumQueuedTasks; ++i) {
519 EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior(
520 FROM_HERE,
521 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool(),
522 false),
523 SequencedWorkerPool::BLOCK_SHUTDOWN));
526 // Setup to open the floodgates from within Shutdown().
527 SetWillWaitForShutdownCallback(
528 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
529 scoped_refptr<TestTracker>(tracker()),
530 0, &blocker, kNumBlockTasks));
532 // Allow half of the additional blocking tasks thru.
533 const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2;
534 pool()->Shutdown(kNumNewBlockingTasksToAllow);
536 // Ensure that the correct number of tasks actually got run.
537 tracker()->WaitUntilTasksComplete(static_cast<size_t>(
538 kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow));
540 // Clean up the task IDs we added and go home.
541 tracker()->ClearCompleteSequence();
544 // Tests that unrun tasks are discarded properly according to their shutdown
545 // mode.
546 TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
547 // Start tasks to take all the threads and block them.
548 EnsureAllWorkersCreated();
549 ThreadBlocker blocker;
550 for (size_t i = 0; i < kNumWorkerThreads; i++) {
551 pool()->PostWorkerTask(FROM_HERE,
552 base::Bind(&TestTracker::BlockTask,
553 tracker(), i, &blocker));
555 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
557 // Create some tasks with different shutdown modes.
558 pool()->PostWorkerTaskWithShutdownBehavior(
559 FROM_HERE,
560 base::Bind(&TestTracker::FastTask, tracker(), 100),
561 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
562 pool()->PostWorkerTaskWithShutdownBehavior(
563 FROM_HERE,
564 base::Bind(&TestTracker::FastTask, tracker(), 101),
565 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
566 pool()->PostWorkerTaskWithShutdownBehavior(
567 FROM_HERE,
568 base::Bind(&TestTracker::FastTask, tracker(), 102),
569 SequencedWorkerPool::BLOCK_SHUTDOWN);
571 // Shutdown the worker pool. This should discard all non-blocking tasks.
572 SetWillWaitForShutdownCallback(
573 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
574 scoped_refptr<TestTracker>(tracker()), 0,
575 &blocker, kNumWorkerThreads));
576 pool()->Shutdown();
578 std::vector<int> result =
579 tracker()->WaitUntilTasksComplete(kNumWorkerThreads + 1);
581 // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
582 // one, in no particular order.
583 ASSERT_EQ(kNumWorkerThreads + 1, result.size());
584 for (size_t i = 0; i < kNumWorkerThreads; i++) {
585 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
586 result.end());
588 EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end());
591 // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
592 TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
593 scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior(
594 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
595 scoped_refptr<SequencedTaskRunner> sequenced_runner(
596 pool()->GetSequencedTaskRunnerWithShutdownBehavior(
597 pool()->GetSequenceToken(),
598 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
599 EnsureAllWorkersCreated();
600 ThreadBlocker blocker;
601 pool()->PostWorkerTaskWithShutdownBehavior(
602 FROM_HERE,
603 base::Bind(&TestTracker::BlockTask,
604 tracker(), 0, &blocker),
605 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
606 runner->PostTask(
607 FROM_HERE,
608 base::Bind(&TestTracker::BlockTask,
609 tracker(), 1, &blocker));
610 sequenced_runner->PostTask(
611 FROM_HERE,
612 base::Bind(&TestTracker::BlockTask,
613 tracker(), 2, &blocker));
615 tracker()->WaitUntilTasksBlocked(3);
617 // This should not block. If this test hangs, it means it failed.
618 pool()->Shutdown();
620 // The task should not have completed yet.
621 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
623 // Posting more tasks should fail.
624 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
625 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
626 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
627 EXPECT_FALSE(runner->PostTask(
628 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
629 EXPECT_FALSE(sequenced_runner->PostTask(
630 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
632 // Continue the background thread and make sure the tasks can complete.
633 blocker.Unblock(3);
634 std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
635 EXPECT_EQ(3u, result.size());
638 // Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown
639 // until they stop, but tasks not yet started do not.
640 TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) {
641 // Start tasks to take all the threads and block them.
642 EnsureAllWorkersCreated();
643 ThreadBlocker blocker;
645 // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not
646 // return until these tasks have completed.
647 for (size_t i = 0; i < kNumWorkerThreads; i++) {
648 pool()->PostWorkerTaskWithShutdownBehavior(
649 FROM_HERE,
650 base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker),
651 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
653 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
655 // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be
656 // executed once Shutdown() has been called.
657 pool()->PostWorkerTaskWithShutdownBehavior(
658 FROM_HERE,
659 base::Bind(&TestTracker::BlockTask,
660 tracker(), 0, &blocker),
661 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
663 // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have
664 // been started block shutdown.
665 SetWillWaitForShutdownCallback(
666 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
667 scoped_refptr<TestTracker>(tracker()), 0,
668 &blocker, kNumWorkerThreads));
670 // No tasks should have completed yet.
671 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
673 // This should not block. If this test hangs, it means it failed.
674 pool()->Shutdown();
676 // Shutdown should not return until all of the tasks have completed.
677 std::vector<int> result =
678 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
680 // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be
681 // allowed to complete. No additional non-blocking tasks should have been
682 // started.
683 ASSERT_EQ(kNumWorkerThreads, result.size());
684 for (size_t i = 0; i < kNumWorkerThreads; i++) {
685 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
686 result.end());
690 // Ensure all worker threads are created, and then trigger a spurious
691 // work signal. This shouldn't cause any other work signals to be
692 // triggered. This is a regression test for http://crbug.com/117469.
693 TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) {
694 EnsureAllWorkersCreated();
695 int old_has_work_call_count = has_work_call_count();
696 pool()->SignalHasWorkForTesting();
697 // This is inherently racy, but can only produce false positives.
698 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
699 EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count());
702 void IsRunningOnCurrentThreadTask(
703 SequencedWorkerPool::SequenceToken test_positive_token,
704 SequencedWorkerPool::SequenceToken test_negative_token,
705 SequencedWorkerPool* pool,
706 SequencedWorkerPool* unused_pool) {
707 EXPECT_TRUE(pool->RunsTasksOnCurrentThread());
708 EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token));
709 EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token));
710 EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
711 EXPECT_FALSE(
712 unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token));
713 EXPECT_FALSE(
714 unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token));
717 // Verify correctness of the IsRunningSequenceOnCurrentThread method.
718 TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) {
719 SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
720 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
721 SequencedWorkerPool::SequenceToken unsequenced_token;
723 scoped_refptr<SequencedWorkerPool> unused_pool =
724 new SequencedWorkerPool(2, "unused_pool");
726 EXPECT_FALSE(pool()->RunsTasksOnCurrentThread());
727 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1));
728 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2));
729 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token));
730 EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
731 EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token1));
732 EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token2));
733 EXPECT_FALSE(
734 unused_pool->IsRunningSequenceOnCurrentThread(unsequenced_token));
736 pool()->PostSequencedWorkerTask(
737 token1, FROM_HERE,
738 base::Bind(&IsRunningOnCurrentThreadTask,
739 token1, token2, pool(), unused_pool));
740 pool()->PostSequencedWorkerTask(
741 token2, FROM_HERE,
742 base::Bind(&IsRunningOnCurrentThreadTask,
743 token2, unsequenced_token, pool(), unused_pool));
744 pool()->PostWorkerTask(
745 FROM_HERE,
746 base::Bind(&IsRunningOnCurrentThreadTask,
747 unsequenced_token, token1, pool(), unused_pool));
748 pool()->Shutdown();
749 unused_pool->Shutdown();
752 // Verify that FlushForTesting works as intended.
753 TEST_F(SequencedWorkerPoolTest, FlushForTesting) {
754 // Should be fine to call on a new instance.
755 pool()->FlushForTesting();
757 // Queue up a bunch of work, including a long delayed task and
758 // a task that produces additional tasks as an artifact.
759 pool()->PostDelayedWorkerTask(
760 FROM_HERE,
761 base::Bind(&TestTracker::FastTask, tracker(), 0),
762 TimeDelta::FromMinutes(5));
763 pool()->PostWorkerTask(FROM_HERE,
764 base::Bind(&TestTracker::SlowTask, tracker(), 0));
765 const size_t kNumFastTasks = 20;
766 for (size_t i = 0; i < kNumFastTasks; i++) {
767 pool()->PostWorkerTask(FROM_HERE,
768 base::Bind(&TestTracker::FastTask, tracker(), 0));
770 pool()->PostWorkerTask(
771 FROM_HERE,
772 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, pool(),
773 true));
775 // We expect all except the delayed task to have been run. We verify all
776 // closures have been deleted by looking at the refcount of the
777 // tracker.
778 EXPECT_FALSE(tracker()->HasOneRef());
779 pool()->FlushForTesting();
780 EXPECT_TRUE(tracker()->HasOneRef());
781 EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount());
783 // Should be fine to call on an idle instance with all threads created, and
784 // spamming the method shouldn't deadlock or confuse the class.
785 pool()->FlushForTesting();
786 pool()->FlushForTesting();
788 // Should be fine to call after shutdown too.
789 pool()->Shutdown();
790 pool()->FlushForTesting();
793 TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) {
794 MessageLoop loop;
795 scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool"));
796 scoped_refptr<SequencedTaskRunner> task_runner =
797 pool->GetSequencedTaskRunnerWithShutdownBehavior(
798 pool->GetSequenceToken(),
799 base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
801 // Upon test exit, should shut down without hanging.
802 pool->Shutdown();
805 class SequencedWorkerPoolTaskRunnerTestDelegate {
806 public:
807 SequencedWorkerPoolTaskRunnerTestDelegate() {}
809 ~SequencedWorkerPoolTaskRunnerTestDelegate() {}
811 void StartTaskRunner() {
812 pool_owner_.reset(
813 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
816 scoped_refptr<SequencedWorkerPool> GetTaskRunner() {
817 return pool_owner_->pool();
820 void StopTaskRunner() {
821 // Make sure all tasks are run before shutting down. Delayed tasks are
822 // not run, they're simply deleted.
823 pool_owner_->pool()->FlushForTesting();
824 pool_owner_->pool()->Shutdown();
825 // Don't reset |pool_owner_| here, as the test may still hold a
826 // reference to the pool.
829 private:
830 MessageLoop message_loop_;
831 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
834 INSTANTIATE_TYPED_TEST_CASE_P(
835 SequencedWorkerPool, TaskRunnerTest,
836 SequencedWorkerPoolTaskRunnerTestDelegate);
838 class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate {
839 public:
840 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {}
842 ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {
845 void StartTaskRunner() {
846 pool_owner_.reset(
847 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
848 task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior(
849 SequencedWorkerPool::BLOCK_SHUTDOWN);
852 scoped_refptr<TaskRunner> GetTaskRunner() {
853 return task_runner_;
856 void StopTaskRunner() {
857 // Make sure all tasks are run before shutting down. Delayed tasks are
858 // not run, they're simply deleted.
859 pool_owner_->pool()->FlushForTesting();
860 pool_owner_->pool()->Shutdown();
861 // Don't reset |pool_owner_| here, as the test may still hold a
862 // reference to the pool.
865 private:
866 MessageLoop message_loop_;
867 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
868 scoped_refptr<TaskRunner> task_runner_;
871 INSTANTIATE_TYPED_TEST_CASE_P(
872 SequencedWorkerPoolTaskRunner, TaskRunnerTest,
873 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate);
875 class SequencedWorkerPoolSequencedTaskRunnerTestDelegate {
876 public:
877 SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {}
879 ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {
882 void StartTaskRunner() {
883 pool_owner_.reset(new SequencedWorkerPoolOwner(
884 10, "SequencedWorkerPoolSequencedTaskRunnerTest"));
885 task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner(
886 pool_owner_->pool()->GetSequenceToken());
889 scoped_refptr<SequencedTaskRunner> GetTaskRunner() {
890 return task_runner_;
893 void StopTaskRunner() {
894 // Make sure all tasks are run before shutting down. Delayed tasks are
895 // not run, they're simply deleted.
896 pool_owner_->pool()->FlushForTesting();
897 pool_owner_->pool()->Shutdown();
898 // Don't reset |pool_owner_| here, as the test may still hold a
899 // reference to the pool.
902 private:
903 MessageLoop message_loop_;
904 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
905 scoped_refptr<SequencedTaskRunner> task_runner_;
908 INSTANTIATE_TYPED_TEST_CASE_P(
909 SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest,
910 SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
912 INSTANTIATE_TYPED_TEST_CASE_P(
913 SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest,
914 SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
916 } // namespace
918 } // namespace base