This source file includes following definitions.
- unblock_counter_
- Block
- Unblock
- started_events_
- FastTask
- SlowTask
- BlockTask
- PostAdditionalTasks
- WaitUntilTasksBlocked
- WaitUntilTasksComplete
- GetTasksCompletedCount
- ClearCompleteSequence
- SignalWorkerDone
- SetUp
- TearDown
- pool
- tracker
- ResetPool
- SetWillWaitForShutdownCallback
- EnsureAllWorkersCreated
- has_work_call_count
- EnsureTasksToCompleteCountAndUnblock
- HoldPoolReference
- TEST_F
- TEST_F
- TEST_F
- TEST_F
- TEST_F
- TEST_F
- TEST_F
- TEST_F
- TEST_F
- TEST_F
- TEST_F
- IsRunningOnCurrentThreadTask
- TEST_F
- TEST_F
- TEST
- StartTaskRunner
- GetTaskRunner
- StopTaskRunner
- StartTaskRunner
- GetTaskRunner
- StopTaskRunner
- StartTaskRunner
- GetTaskRunner
- StopTaskRunner
#include "base/threading/sequenced_worker_pool.h"
#include <algorithm>
#include "base/bind.h"
#include "base/compiler_specific.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/message_loop/message_loop_proxy.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/test/sequenced_task_runner_test_template.h"
#include "base/test/sequenced_worker_pool_owner.h"
#include "base/test/task_runner_test_template.h"
#include "base/test/test_timeouts.h"
#include "base/threading/platform_thread.h"
#include "base/time/time.h"
#include "base/tracked_objects.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace base {
namespace {
const size_t kNumWorkerThreads = 3;
class ThreadBlocker {
public:
ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {}
void Block() {
{
base::AutoLock lock(lock_);
while (unblock_counter_ == 0)
cond_var_.Wait();
unblock_counter_--;
}
cond_var_.Signal();
}
void Unblock(size_t count) {
{
base::AutoLock lock(lock_);
DCHECK(unblock_counter_ == 0);
unblock_counter_ = count;
}
cond_var_.Signal();
}
private:
base::Lock lock_;
base::ConditionVariable cond_var_;
size_t unblock_counter_;
};
class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
public:
TestTracker()
: lock_(),
cond_var_(&lock_),
started_events_(0) {
}
void FastTask(int id) {
SignalWorkerDone(id);
}
void SlowTask(int id) {
base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1));
SignalWorkerDone(id);
}
void BlockTask(int id, ThreadBlocker* blocker) {
{
base::AutoLock lock(lock_);
started_events_++;
}
cond_var_.Signal();
blocker->Block();
SignalWorkerDone(id);
}
void PostAdditionalTasks(
int id, SequencedWorkerPool* pool,
bool expected_return_value) {
Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100);
EXPECT_EQ(expected_return_value,
pool->PostWorkerTaskWithShutdownBehavior(
FROM_HERE, fast_task,
SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
EXPECT_EQ(expected_return_value,
pool->PostWorkerTaskWithShutdownBehavior(
FROM_HERE, fast_task,
SequencedWorkerPool::SKIP_ON_SHUTDOWN));
pool->PostWorkerTaskWithShutdownBehavior(
FROM_HERE, fast_task,
SequencedWorkerPool::BLOCK_SHUTDOWN);
SignalWorkerDone(id);
}
void WaitUntilTasksBlocked(size_t count) {
{
base::AutoLock lock(lock_);
while (started_events_ < count)
cond_var_.Wait();
}
cond_var_.Signal();
}
std::vector<int> WaitUntilTasksComplete(size_t num_tasks) {
std::vector<int> ret;
{
base::AutoLock lock(lock_);
while (complete_sequence_.size() < num_tasks)
cond_var_.Wait();
ret = complete_sequence_;
}
cond_var_.Signal();
return ret;
}
size_t GetTasksCompletedCount() {
base::AutoLock lock(lock_);
return complete_sequence_.size();
}
void ClearCompleteSequence() {
base::AutoLock lock(lock_);
complete_sequence_.clear();
started_events_ = 0;
}
private:
friend class base::RefCountedThreadSafe<TestTracker>;
~TestTracker() {}
void SignalWorkerDone(int id) {
{
base::AutoLock lock(lock_);
complete_sequence_.push_back(id);
}
cond_var_.Signal();
}
base::Lock lock_;
base::ConditionVariable cond_var_;
std::vector<int> complete_sequence_;
size_t started_events_;
};
class SequencedWorkerPoolTest : public testing::Test {
public:
SequencedWorkerPoolTest()
: tracker_(new TestTracker) {
ResetPool();
}
virtual ~SequencedWorkerPoolTest() {}
virtual void SetUp() OVERRIDE {}
virtual void TearDown() OVERRIDE {
pool()->Shutdown();
}
const scoped_refptr<SequencedWorkerPool>& pool() {
return pool_owner_->pool();
}
TestTracker* tracker() { return tracker_.get(); }
void ResetPool() {
pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test"));
}
void SetWillWaitForShutdownCallback(const Closure& callback) {
pool_owner_->SetWillWaitForShutdownCallback(callback);
}
void EnsureAllWorkersCreated() {
ThreadBlocker blocker;
for (size_t i = 0; i < kNumWorkerThreads; i++) {
pool()->PostWorkerTask(FROM_HERE,
base::Bind(&TestTracker::BlockTask,
tracker(), -1, &blocker));
}
tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
blocker.Unblock(kNumWorkerThreads);
tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
tracker()->ClearCompleteSequence();
}
int has_work_call_count() const {
return pool_owner_->has_work_call_count();
}
private:
MessageLoop message_loop_;
scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
const scoped_refptr<TestTracker> tracker_;
};
void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker,
size_t expected_tasks_to_complete,
ThreadBlocker* blocker,
size_t threads_to_awake) {
EXPECT_EQ(
expected_tasks_to_complete,
tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size());
blocker->Unblock(threads_to_awake);
}
class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> {
public:
explicit DeletionHelper(
const scoped_refptr<base::RefCountedData<bool> >& deleted_flag)
: deleted_flag_(deleted_flag) {
}
private:
friend class base::RefCountedThreadSafe<DeletionHelper>;
virtual ~DeletionHelper() { deleted_flag_->data = true; }
const scoped_refptr<base::RefCountedData<bool> > deleted_flag_;
DISALLOW_COPY_AND_ASSIGN(DeletionHelper);
};
void HoldPoolReference(const scoped_refptr<base::SequencedWorkerPool>& pool,
const scoped_refptr<DeletionHelper>& helper) {
ADD_FAILURE() << "Should never run";
}
TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) {
EXPECT_TRUE(pool()->PostTask(
FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1)));
scoped_refptr<base::RefCountedData<bool> > deleted_flag(
new base::RefCountedData<bool>(false));
base::Time posted_at(base::Time::Now());
EXPECT_TRUE(pool()->PostDelayedTask(
FROM_HERE,
base::Bind(&HoldPoolReference,
pool(),
make_scoped_refptr(new DeletionHelper(deleted_flag))),
TestTimeouts::action_timeout()));
std::vector<int> completion_sequence = tracker()->WaitUntilTasksComplete(1);
ASSERT_EQ(1u, completion_sequence.size());
ASSERT_EQ(1, completion_sequence[0]);
pool()->Shutdown();
ResetPool();
ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout());
ASSERT_TRUE(deleted_flag->data);
}
TEST_F(SequencedWorkerPoolTest, NamedTokens) {
const std::string name1("hello");
SequencedWorkerPool::SequenceToken token1 =
pool()->GetNamedSequenceToken(name1);
SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
const std::string name3("goodbye");
SequencedWorkerPool::SequenceToken token3 =
pool()->GetNamedSequenceToken(name3);
EXPECT_FALSE(token1.Equals(token2));
EXPECT_FALSE(token1.Equals(token3));
EXPECT_FALSE(token2.Equals(token3));
SequencedWorkerPool::SequenceToken token1again =
pool()->GetNamedSequenceToken(name1);
EXPECT_TRUE(token1.Equals(token1again));
SequencedWorkerPool::SequenceToken token3again =
pool()->GetNamedSequenceToken(name3);
EXPECT_TRUE(token3.Equals(token3again));
}
TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
pool()->PostWorkerTask(FROM_HERE,
base::Bind(&TestTracker::SlowTask, tracker(), 0));
const size_t kNumTasks = 20;
for (size_t i = 1; i < kNumTasks; i++) {
pool()->PostWorkerTask(FROM_HERE,
base::Bind(&TestTracker::FastTask, tracker(), i));
}
std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
EXPECT_EQ(kNumTasks, result.size());
}
TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) {
SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1");
SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2");
base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0);
pool1.pool()->PostWorkerTask(FROM_HERE, slow_task);
pool2.pool()->PostWorkerTask(FROM_HERE, slow_task);
const size_t kNumTasks = 20;
for (size_t i = 1; i < kNumTasks; i++) {
base::Closure fast_task =
base::Bind(&TestTracker::FastTask, tracker(), i);
pool1.pool()->PostWorkerTask(FROM_HERE, fast_task);
pool2.pool()->PostWorkerTask(FROM_HERE, fast_task);
}
std::vector<int> result =
tracker()->WaitUntilTasksComplete(2*kNumTasks);
EXPECT_EQ(2 * kNumTasks, result.size());
pool2.pool()->Shutdown();
pool1.pool()->Shutdown();
}
TEST_F(SequencedWorkerPoolTest, Sequence) {
const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
ThreadBlocker background_blocker;
for (size_t i = 0; i < kNumBackgroundTasks; i++) {
pool()->PostWorkerTask(FROM_HERE,
base::Bind(&TestTracker::BlockTask,
tracker(), i, &background_blocker));
}
tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks);
ThreadBlocker blocker;
SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
pool()->PostSequencedWorkerTask(
token1, FROM_HERE,
base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker));
pool()->PostSequencedWorkerTask(
token1, FROM_HERE,
base::Bind(&TestTracker::FastTask, tracker(), 101));
EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
pool()->PostSequencedWorkerTask(
token2, FROM_HERE,
base::Bind(&TestTracker::FastTask, tracker(), 200));
pool()->PostSequencedWorkerTask(
token2, FROM_HERE,
base::Bind(&TestTracker::FastTask, tracker(), 201));
EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
background_blocker.Unblock(1);
std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
ASSERT_EQ(3u, result.size());
EXPECT_EQ(200, result[1]);
EXPECT_EQ(201, result[2]);
background_blocker.Unblock(kNumBackgroundTasks - 1);
EXPECT_EQ(kNumBackgroundTasks + 2,
tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size());
blocker.Unblock(1);
result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4);
ASSERT_EQ(kNumBackgroundTasks + 4, result.size());
EXPECT_EQ(100, result[result.size() - 2]);
EXPECT_EQ(101, result[result.size() - 1]);
}
TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) {
EnsureAllWorkersCreated();
ThreadBlocker blocker;
for (size_t i = 0; i < kNumWorkerThreads; i++) {
pool()->PostWorkerTask(FROM_HERE,
base::Bind(&TestTracker::BlockTask,
tracker(), i, &blocker));
}
tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
SetWillWaitForShutdownCallback(
base::Bind(&EnsureTasksToCompleteCountAndUnblock,
scoped_refptr<TestTracker>(tracker()), 0,
&blocker, kNumWorkerThreads));
const int kMaxNewBlockingTasksAfterShutdown = 100;
pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown);
int old_has_work_call_count = has_work_call_count();
std::vector<int> result =
tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
ASSERT_EQ(kNumWorkerThreads, result.size());
for (size_t i = 0; i < kNumWorkerThreads; i++) {
EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
result.end());
}
EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
FROM_HERE,
base::Bind(&TestTracker::FastTask, tracker(), 100),
SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
FROM_HERE,
base::Bind(&TestTracker::FastTask, tracker(), 101),
SequencedWorkerPool::SKIP_ON_SHUTDOWN));
EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
FROM_HERE,
base::Bind(&TestTracker::FastTask, tracker(), 102),
SequencedWorkerPool::BLOCK_SHUTDOWN));
ASSERT_EQ(old_has_work_call_count, has_work_call_count());
}
TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) {
EnsureAllWorkersCreated();
ThreadBlocker blocker;
const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
for (int i = 0; i < kNumBlockTasks; ++i) {
EXPECT_TRUE(pool()->PostWorkerTask(
FROM_HERE,
base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
}
tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads);
for (int i = 0; i < kNumQueuedTasks; ++i) {
EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior(
FROM_HERE,
base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool(),
false),
SequencedWorkerPool::BLOCK_SHUTDOWN));
}
SetWillWaitForShutdownCallback(
base::Bind(&EnsureTasksToCompleteCountAndUnblock,
scoped_refptr<TestTracker>(tracker()),
0, &blocker, kNumBlockTasks));
const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2;
pool()->Shutdown(kNumNewBlockingTasksToAllow);
tracker()->WaitUntilTasksComplete(static_cast<size_t>(
kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow));
tracker()->ClearCompleteSequence();
}
TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
EnsureAllWorkersCreated();
ThreadBlocker blocker;
for (size_t i = 0; i < kNumWorkerThreads; i++) {
pool()->PostWorkerTask(FROM_HERE,
base::Bind(&TestTracker::BlockTask,
tracker(), i, &blocker));
}
tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
pool()->PostWorkerTaskWithShutdownBehavior(
FROM_HERE,
base::Bind(&TestTracker::FastTask, tracker(), 100),
SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
pool()->PostWorkerTaskWithShutdownBehavior(
FROM_HERE,
base::Bind(&TestTracker::FastTask, tracker(), 101),
SequencedWorkerPool::SKIP_ON_SHUTDOWN);
pool()->PostWorkerTaskWithShutdownBehavior(
FROM_HERE,
base::Bind(&TestTracker::FastTask, tracker(), 102),
SequencedWorkerPool::BLOCK_SHUTDOWN);
SetWillWaitForShutdownCallback(
base::Bind(&EnsureTasksToCompleteCountAndUnblock,
scoped_refptr<TestTracker>(tracker()), 0,
&blocker, kNumWorkerThreads));
pool()->Shutdown();
std::vector<int> result =
tracker()->WaitUntilTasksComplete(kNumWorkerThreads + 1);
ASSERT_EQ(kNumWorkerThreads + 1, result.size());
for (size_t i = 0; i < kNumWorkerThreads; i++) {
EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
result.end());
}
EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end());
}
TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior(
SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
scoped_refptr<SequencedTaskRunner> sequenced_runner(
pool()->GetSequencedTaskRunnerWithShutdownBehavior(
pool()->GetSequenceToken(),
SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
EnsureAllWorkersCreated();
ThreadBlocker blocker;
pool()->PostWorkerTaskWithShutdownBehavior(
FROM_HERE,
base::Bind(&TestTracker::BlockTask,
tracker(), 0, &blocker),
SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
runner->PostTask(
FROM_HERE,
base::Bind(&TestTracker::BlockTask,
tracker(), 1, &blocker));
sequenced_runner->PostTask(
FROM_HERE,
base::Bind(&TestTracker::BlockTask,
tracker(), 2, &blocker));
tracker()->WaitUntilTasksBlocked(3);
pool()->Shutdown();
EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
EXPECT_FALSE(runner->PostTask(
FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
EXPECT_FALSE(sequenced_runner->PostTask(
FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
blocker.Unblock(3);
std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
EXPECT_EQ(3u, result.size());
}
TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) {
EnsureAllWorkersCreated();
ThreadBlocker blocker;
for (size_t i = 0; i < kNumWorkerThreads; i++) {
pool()->PostWorkerTaskWithShutdownBehavior(
FROM_HERE,
base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker),
SequencedWorkerPool::SKIP_ON_SHUTDOWN);
}
tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
pool()->PostWorkerTaskWithShutdownBehavior(
FROM_HERE,
base::Bind(&TestTracker::BlockTask,
tracker(), 0, &blocker),
SequencedWorkerPool::SKIP_ON_SHUTDOWN);
SetWillWaitForShutdownCallback(
base::Bind(&EnsureTasksToCompleteCountAndUnblock,
scoped_refptr<TestTracker>(tracker()), 0,
&blocker, kNumWorkerThreads));
EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
pool()->Shutdown();
std::vector<int> result =
tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
ASSERT_EQ(kNumWorkerThreads, result.size());
for (size_t i = 0; i < kNumWorkerThreads; i++) {
EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
result.end());
}
}
TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) {
EnsureAllWorkersCreated();
int old_has_work_call_count = has_work_call_count();
pool()->SignalHasWorkForTesting();
base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count());
}
void IsRunningOnCurrentThreadTask(
SequencedWorkerPool::SequenceToken test_positive_token,
SequencedWorkerPool::SequenceToken test_negative_token,
SequencedWorkerPool* pool,
SequencedWorkerPool* unused_pool) {
EXPECT_TRUE(pool->RunsTasksOnCurrentThread());
EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token));
EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token));
EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
EXPECT_FALSE(
unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token));
EXPECT_FALSE(
unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token));
}
TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) {
SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
SequencedWorkerPool::SequenceToken unsequenced_token;
scoped_refptr<SequencedWorkerPool> unused_pool =
new SequencedWorkerPool(2, "unused_pool");
EXPECT_FALSE(pool()->RunsTasksOnCurrentThread());
EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1));
EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2));
EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token));
EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token1));
EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token2));
EXPECT_FALSE(
unused_pool->IsRunningSequenceOnCurrentThread(unsequenced_token));
pool()->PostSequencedWorkerTask(
token1, FROM_HERE,
base::Bind(&IsRunningOnCurrentThreadTask,
token1, token2, pool(), unused_pool));
pool()->PostSequencedWorkerTask(
token2, FROM_HERE,
base::Bind(&IsRunningOnCurrentThreadTask,
token2, unsequenced_token, pool(), unused_pool));
pool()->PostWorkerTask(
FROM_HERE,
base::Bind(&IsRunningOnCurrentThreadTask,
unsequenced_token, token1, pool(), unused_pool));
pool()->Shutdown();
unused_pool->Shutdown();
}
TEST_F(SequencedWorkerPoolTest, FlushForTesting) {
pool()->FlushForTesting();
pool()->PostDelayedWorkerTask(
FROM_HERE,
base::Bind(&TestTracker::FastTask, tracker(), 0),
TimeDelta::FromMinutes(5));
pool()->PostWorkerTask(FROM_HERE,
base::Bind(&TestTracker::SlowTask, tracker(), 0));
const size_t kNumFastTasks = 20;
for (size_t i = 0; i < kNumFastTasks; i++) {
pool()->PostWorkerTask(FROM_HERE,
base::Bind(&TestTracker::FastTask, tracker(), 0));
}
pool()->PostWorkerTask(
FROM_HERE,
base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, pool(),
true));
EXPECT_FALSE(tracker()->HasOneRef());
pool()->FlushForTesting();
EXPECT_TRUE(tracker()->HasOneRef());
EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount());
pool()->FlushForTesting();
pool()->FlushForTesting();
pool()->Shutdown();
pool()->FlushForTesting();
}
TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) {
MessageLoop loop;
scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool"));
scoped_refptr<SequencedTaskRunner> task_runner =
pool->GetSequencedTaskRunnerWithShutdownBehavior(
pool->GetSequenceToken(),
base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
pool->Shutdown();
}
class SequencedWorkerPoolTaskRunnerTestDelegate {
public:
SequencedWorkerPoolTaskRunnerTestDelegate() {}
~SequencedWorkerPoolTaskRunnerTestDelegate() {}
void StartTaskRunner() {
pool_owner_.reset(
new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
}
scoped_refptr<SequencedWorkerPool> GetTaskRunner() {
return pool_owner_->pool();
}
void StopTaskRunner() {
pool_owner_->pool()->FlushForTesting();
pool_owner_->pool()->Shutdown();
}
private:
MessageLoop message_loop_;
scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
};
INSTANTIATE_TYPED_TEST_CASE_P(
SequencedWorkerPool, TaskRunnerTest,
SequencedWorkerPoolTaskRunnerTestDelegate);
class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate {
public:
SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {}
~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {
}
void StartTaskRunner() {
pool_owner_.reset(
new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior(
SequencedWorkerPool::BLOCK_SHUTDOWN);
}
scoped_refptr<TaskRunner> GetTaskRunner() {
return task_runner_;
}
void StopTaskRunner() {
pool_owner_->pool()->FlushForTesting();
pool_owner_->pool()->Shutdown();
}
private:
MessageLoop message_loop_;
scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
scoped_refptr<TaskRunner> task_runner_;
};
INSTANTIATE_TYPED_TEST_CASE_P(
SequencedWorkerPoolTaskRunner, TaskRunnerTest,
SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate);
class SequencedWorkerPoolSequencedTaskRunnerTestDelegate {
public:
SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {}
~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {
}
void StartTaskRunner() {
pool_owner_.reset(new SequencedWorkerPoolOwner(
10, "SequencedWorkerPoolSequencedTaskRunnerTest"));
task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner(
pool_owner_->pool()->GetSequenceToken());
}
scoped_refptr<SequencedTaskRunner> GetTaskRunner() {
return task_runner_;
}
void StopTaskRunner() {
pool_owner_->pool()->FlushForTesting();
pool_owner_->pool()->Shutdown();
}
private:
MessageLoop message_loop_;
scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
scoped_refptr<SequencedTaskRunner> task_runner_;
};
INSTANTIATE_TYPED_TEST_CASE_P(
SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest,
SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
INSTANTIATE_TYPED_TEST_CASE_P(
SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest,
SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
}
}