This source file includes following definitions.
- kOneHundredMs
- TEST_F
- TEST_F
- BackInTime
- TEST_F
- TEST_F
- TEST_F
- shutdown_
- GetThreadId
- EveryIdWasAllocated
- GetAnAssignment
- WorkIsCompleted
- task_count
- allow_help_requests
- shutdown
- ThreadSafeCheckShutdown
- thread_shutting_down
- lock
- work_is_available
- all_threads_have_ids
- no_more_tasks
- ResetHistory
- GetMinCompletionsByWorkerThread
- GetMaxCompletionsByWorkerThread
- GetNumThreadsTakingAssignments
- GetNumThreadsCompletingTasks
- GetNumberOfCompletedTasks
- SetWorkTime
- SetTaskCount
- SetAllowHelp
- SetShutdown
- SpinUntilAllThreadsAreWaiting
- SpinUntilTaskCountLessThan
- ThreadMain
#include <time.h>
#include <algorithm>
#include <vector>
#include "base/bind.h"
#include "base/logging.h"
#include "base/memory/scoped_ptr.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/spin_wait.h"
#include "base/threading/platform_thread.h"
#include "base/threading/thread.h"
#include "base/threading/thread_collision_warner.h"
#include "base/time/time.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "testing/platform_test.h"
namespace base {
namespace {
class ConditionVariableTest : public PlatformTest {
public:
const TimeDelta kZeroMs;
const TimeDelta kTenMs;
const TimeDelta kThirtyMs;
const TimeDelta kFortyFiveMs;
const TimeDelta kSixtyMs;
const TimeDelta kOneHundredMs;
ConditionVariableTest()
: kZeroMs(TimeDelta::FromMilliseconds(0)),
kTenMs(TimeDelta::FromMilliseconds(10)),
kThirtyMs(TimeDelta::FromMilliseconds(30)),
kFortyFiveMs(TimeDelta::FromMilliseconds(45)),
kSixtyMs(TimeDelta::FromMilliseconds(60)),
kOneHundredMs(TimeDelta::FromMilliseconds(100)) {
}
};
class WorkQueue : public PlatformThread::Delegate {
public:
explicit WorkQueue(int thread_count);
virtual ~WorkQueue();
virtual void ThreadMain() OVERRIDE;
int GetThreadId();
bool EveryIdWasAllocated() const;
TimeDelta GetAnAssignment(int thread_id);
void WorkIsCompleted(int thread_id);
int task_count() const;
bool allow_help_requests() const;
bool shutdown() const;
void thread_shutting_down();
Lock* lock();
ConditionVariable* work_is_available();
ConditionVariable* all_threads_have_ids();
ConditionVariable* no_more_tasks();
void ResetHistory();
int GetMinCompletionsByWorkerThread() const;
int GetMaxCompletionsByWorkerThread() const;
int GetNumThreadsTakingAssignments() const;
int GetNumThreadsCompletingTasks() const;
int GetNumberOfCompletedTasks() const;
void SetWorkTime(TimeDelta delay);
void SetTaskCount(int count);
void SetAllowHelp(bool allow);
void SpinUntilAllThreadsAreWaiting();
void SpinUntilTaskCountLessThan(int task_count);
void SetShutdown();
bool ThreadSafeCheckShutdown(int thread_count);
private:
Lock lock_;
ConditionVariable work_is_available_;
ConditionVariable all_threads_have_ids_;
ConditionVariable no_more_tasks_;
const int thread_count_;
int waiting_thread_count_;
scoped_ptr<PlatformThreadHandle[]> thread_handles_;
std::vector<int> assignment_history_;
std::vector<int> completion_history_;
int thread_started_counter_;
int shutdown_task_count_;
int task_count_;
TimeDelta worker_delay_;
bool allow_help_requests_;
bool shutdown_;
DFAKE_MUTEX(locked_methods_);
};
TEST_F(ConditionVariableTest, StartupShutdownTest) {
Lock lock;
{
ConditionVariable cv1(&lock);
}
ConditionVariable cv(&lock);
lock.Acquire();
cv.TimedWait(kTenMs);
cv.TimedWait(kTenMs);
lock.Release();
lock.Acquire();
cv.TimedWait(kTenMs);
cv.TimedWait(kTenMs);
cv.TimedWait(kTenMs);
lock.Release();
}
TEST_F(ConditionVariableTest, TimeoutTest) {
Lock lock;
ConditionVariable cv(&lock);
lock.Acquire();
TimeTicks start = TimeTicks::Now();
const TimeDelta WAIT_TIME = TimeDelta::FromMilliseconds(300);
const TimeDelta FUDGE_TIME = TimeDelta::FromMilliseconds(50);
cv.TimedWait(WAIT_TIME + FUDGE_TIME);
TimeDelta duration = TimeTicks::Now() - start;
EXPECT_TRUE(duration >= WAIT_TIME);
lock.Release();
}
#if defined(OS_POSIX)
const int kDiscontinuitySeconds = 2;
void BackInTime(Lock* lock) {
AutoLock auto_lock(*lock);
timeval tv;
gettimeofday(&tv, NULL);
tv.tv_sec -= kDiscontinuitySeconds;
settimeofday(&tv, NULL);
}
TEST_F(ConditionVariableTest, DISABLED_TimeoutAcrossSetTimeOfDay) {
timeval tv;
gettimeofday(&tv, NULL);
tv.tv_sec += kDiscontinuitySeconds;
if (settimeofday(&tv, NULL) < 0) {
PLOG(ERROR) << "Could not set time of day. Run as root?";
return;
}
Lock lock;
ConditionVariable cv(&lock);
lock.Acquire();
Thread thread("Helper");
thread.Start();
thread.message_loop()->PostTask(FROM_HERE, base::Bind(&BackInTime, &lock));
TimeTicks start = TimeTicks::Now();
const TimeDelta kWaitTime = TimeDelta::FromMilliseconds(300);
const TimeDelta kFudgeTime = TimeDelta::FromMilliseconds(50);
cv.TimedWait(kWaitTime + kFudgeTime);
TimeDelta duration = TimeTicks::Now() - start;
thread.Stop();
EXPECT_TRUE(duration >= kWaitTime);
EXPECT_TRUE(duration <= TimeDelta::FromSeconds(kDiscontinuitySeconds));
lock.Release();
}
#endif
#if defined(OS_WIN)
#define MAYBE_MultiThreadConsumerTest DISABLED_MultiThreadConsumerTest
#else
#define MAYBE_MultiThreadConsumerTest MultiThreadConsumerTest
#endif
TEST_F(ConditionVariableTest, MAYBE_MultiThreadConsumerTest) {
const int kThreadCount = 10;
WorkQueue queue(kThreadCount);
const int kTaskCount = 10;
Time start_time;
{
base::AutoLock auto_lock(*queue.lock());
while (!queue.EveryIdWasAllocated())
queue.all_threads_have_ids()->Wait();
}
queue.SpinUntilAllThreadsAreWaiting();
{
base::AutoLock auto_lock(*queue.lock());
EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments());
EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks());
EXPECT_EQ(0, queue.task_count());
EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread());
EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
EXPECT_EQ(0, queue.GetNumberOfCompletedTasks());
queue.ResetHistory();
queue.SetTaskCount(kTaskCount);
queue.SetWorkTime(kThirtyMs);
queue.SetAllowHelp(true);
start_time = Time::Now();
}
queue.work_is_available()->Signal();
queue.SpinUntilTaskCountLessThan(kTaskCount);
queue.SpinUntilAllThreadsAreWaiting();
{
base::AutoLock auto_lock(*queue.lock());
while (queue.task_count())
queue.no_more_tasks()->Wait();
EXPECT_LE(2, queue.GetNumThreadsTakingAssignments());
EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks());
queue.ResetHistory();
queue.SetTaskCount(3);
queue.SetWorkTime(kThirtyMs);
queue.SetAllowHelp(false);
}
queue.work_is_available()->Broadcast();
queue.SpinUntilTaskCountLessThan(3);
queue.SpinUntilAllThreadsAreWaiting();
{
base::AutoLock auto_lock(*queue.lock());
EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments());
EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks());
EXPECT_EQ(0, queue.task_count());
EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread());
EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
EXPECT_EQ(3, queue.GetNumberOfCompletedTasks());
queue.ResetHistory();
queue.SetTaskCount(3);
queue.SetWorkTime(kThirtyMs);
queue.SetAllowHelp(true);
}
queue.work_is_available()->Broadcast();
queue.SpinUntilTaskCountLessThan(3);
queue.SpinUntilAllThreadsAreWaiting();
{
base::AutoLock auto_lock(*queue.lock());
EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments());
EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks());
EXPECT_EQ(0, queue.task_count());
EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread());
EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
EXPECT_EQ(3, queue.GetNumberOfCompletedTasks());
queue.ResetHistory();
queue.SetTaskCount(20);
queue.SetWorkTime(kThirtyMs);
queue.SetAllowHelp(true);
}
queue.work_is_available()->Signal();
queue.SpinUntilTaskCountLessThan(20);
queue.SpinUntilAllThreadsAreWaiting();
{
base::AutoLock auto_lock(*queue.lock());
EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments());
EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks());
EXPECT_EQ(0, queue.task_count());
EXPECT_EQ(20, queue.GetNumberOfCompletedTasks());
queue.ResetHistory();
queue.SetTaskCount(20);
queue.SetWorkTime(kThirtyMs);
queue.SetAllowHelp(true);
}
queue.work_is_available()->Broadcast();
queue.SpinUntilTaskCountLessThan(20);
queue.SpinUntilAllThreadsAreWaiting();
{
base::AutoLock auto_lock(*queue.lock());
EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments());
EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks());
EXPECT_EQ(0, queue.task_count());
EXPECT_EQ(20, queue.GetNumberOfCompletedTasks());
queue.SetShutdown();
}
queue.work_is_available()->Broadcast();
SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1),
queue.ThreadSafeCheckShutdown(kThreadCount));
}
TEST_F(ConditionVariableTest, LargeFastTaskTest) {
const int kThreadCount = 200;
WorkQueue queue(kThreadCount);
Lock private_lock;
base::AutoLock private_held_lock(private_lock);
ConditionVariable private_cv(&private_lock);
{
base::AutoLock auto_lock(*queue.lock());
while (!queue.EveryIdWasAllocated())
queue.all_threads_have_ids()->Wait();
}
queue.SpinUntilAllThreadsAreWaiting();
{
base::AutoLock auto_lock(*queue.lock());
EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments());
EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks());
EXPECT_EQ(0, queue.task_count());
EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread());
EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
EXPECT_EQ(0, queue.GetNumberOfCompletedTasks());
queue.ResetHistory();
queue.SetTaskCount(20 * kThreadCount);
queue.SetWorkTime(kFortyFiveMs);
queue.SetAllowHelp(false);
}
queue.work_is_available()->Broadcast();
{
base::AutoLock auto_lock(*queue.lock());
while (queue.task_count() != 0)
queue.no_more_tasks()->Wait();
}
queue.SpinUntilAllThreadsAreWaiting();
{
base::AutoLock auto_lock(*queue.lock());
EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments());
EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks());
EXPECT_EQ(0, queue.task_count());
EXPECT_LE(20, queue.GetMaxCompletionsByWorkerThread());
EXPECT_EQ(20 * kThreadCount, queue.GetNumberOfCompletedTasks());
queue.ResetHistory();
queue.SetTaskCount(kThreadCount * 4);
queue.SetWorkTime(kFortyFiveMs);
queue.SetAllowHelp(true);
}
queue.work_is_available()->Signal();
{
base::AutoLock auto_lock(*queue.lock());
while (queue.task_count() != 0)
queue.no_more_tasks()->Wait();
}
queue.SpinUntilAllThreadsAreWaiting();
{
base::AutoLock auto_lock(*queue.lock());
EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments());
EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks());
EXPECT_EQ(0, queue.task_count());
EXPECT_LE(4, queue.GetMaxCompletionsByWorkerThread());
EXPECT_EQ(4 * kThreadCount, queue.GetNumberOfCompletedTasks());
queue.SetShutdown();
}
queue.work_is_available()->Broadcast();
SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1),
queue.ThreadSafeCheckShutdown(kThreadCount));
}
WorkQueue::WorkQueue(int thread_count)
: lock_(),
work_is_available_(&lock_),
all_threads_have_ids_(&lock_),
no_more_tasks_(&lock_),
thread_count_(thread_count),
waiting_thread_count_(0),
thread_handles_(new PlatformThreadHandle[thread_count]),
assignment_history_(thread_count),
completion_history_(thread_count),
thread_started_counter_(0),
shutdown_task_count_(0),
task_count_(0),
allow_help_requests_(false),
shutdown_(false) {
EXPECT_GE(thread_count_, 1);
ResetHistory();
SetTaskCount(0);
SetWorkTime(TimeDelta::FromMilliseconds(30));
for (int i = 0; i < thread_count_; ++i) {
PlatformThreadHandle pth;
EXPECT_TRUE(PlatformThread::Create(0, this, &pth));
thread_handles_[i] = pth;
}
}
WorkQueue::~WorkQueue() {
{
base::AutoLock auto_lock(lock_);
SetShutdown();
}
work_is_available_.Broadcast();
for (int i = 0; i < thread_count_; ++i) {
PlatformThread::Join(thread_handles_[i]);
}
EXPECT_EQ(0, waiting_thread_count_);
}
int WorkQueue::GetThreadId() {
DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
DCHECK(!EveryIdWasAllocated());
return thread_started_counter_++;
}
bool WorkQueue::EveryIdWasAllocated() const {
DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
return thread_count_ == thread_started_counter_;
}
TimeDelta WorkQueue::GetAnAssignment(int thread_id) {
DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
DCHECK_LT(0, task_count_);
assignment_history_[thread_id]++;
if (0 == --task_count_) {
no_more_tasks_.Signal();
}
return worker_delay_;
}
void WorkQueue::WorkIsCompleted(int thread_id) {
DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
completion_history_[thread_id]++;
}
int WorkQueue::task_count() const {
DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
return task_count_;
}
bool WorkQueue::allow_help_requests() const {
DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
return allow_help_requests_;
}
bool WorkQueue::shutdown() const {
lock_.AssertAcquired();
DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
return shutdown_;
}
bool WorkQueue::ThreadSafeCheckShutdown(int thread_count) {
bool all_shutdown;
base::AutoLock auto_lock(lock_);
{
DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
all_shutdown = (shutdown_task_count_ == thread_count);
}
return all_shutdown;
}
void WorkQueue::thread_shutting_down() {
lock_.AssertAcquired();
DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
shutdown_task_count_++;
}
Lock* WorkQueue::lock() {
return &lock_;
}
ConditionVariable* WorkQueue::work_is_available() {
return &work_is_available_;
}
ConditionVariable* WorkQueue::all_threads_have_ids() {
return &all_threads_have_ids_;
}
ConditionVariable* WorkQueue::no_more_tasks() {
return &no_more_tasks_;
}
void WorkQueue::ResetHistory() {
for (int i = 0; i < thread_count_; ++i) {
assignment_history_[i] = 0;
completion_history_[i] = 0;
}
}
int WorkQueue::GetMinCompletionsByWorkerThread() const {
int minumum = completion_history_[0];
for (int i = 0; i < thread_count_; ++i)
minumum = std::min(minumum, completion_history_[i]);
return minumum;
}
int WorkQueue::GetMaxCompletionsByWorkerThread() const {
int maximum = completion_history_[0];
for (int i = 0; i < thread_count_; ++i)
maximum = std::max(maximum, completion_history_[i]);
return maximum;
}
int WorkQueue::GetNumThreadsTakingAssignments() const {
int count = 0;
for (int i = 0; i < thread_count_; ++i)
if (assignment_history_[i])
count++;
return count;
}
int WorkQueue::GetNumThreadsCompletingTasks() const {
int count = 0;
for (int i = 0; i < thread_count_; ++i)
if (completion_history_[i])
count++;
return count;
}
int WorkQueue::GetNumberOfCompletedTasks() const {
int total = 0;
for (int i = 0; i < thread_count_; ++i)
total += completion_history_[i];
return total;
}
void WorkQueue::SetWorkTime(TimeDelta delay) {
worker_delay_ = delay;
}
void WorkQueue::SetTaskCount(int count) {
task_count_ = count;
}
void WorkQueue::SetAllowHelp(bool allow) {
allow_help_requests_ = allow;
}
void WorkQueue::SetShutdown() {
lock_.AssertAcquired();
shutdown_ = true;
}
void WorkQueue::SpinUntilAllThreadsAreWaiting() {
while (true) {
{
base::AutoLock auto_lock(lock_);
if (waiting_thread_count_ == thread_count_)
break;
}
PlatformThread::Sleep(TimeDelta::FromMilliseconds(30));
}
}
void WorkQueue::SpinUntilTaskCountLessThan(int task_count) {
while (true) {
{
base::AutoLock auto_lock(lock_);
if (task_count_ < task_count)
break;
}
PlatformThread::Sleep(TimeDelta::FromMilliseconds(30));
}
}
void WorkQueue::ThreadMain() {
int thread_id;
{
base::AutoLock auto_lock(lock_);
thread_id = GetThreadId();
if (EveryIdWasAllocated())
all_threads_have_ids()->Signal();
}
Lock private_lock;
while (1) {
TimeDelta work_time;
bool could_use_help;
{
base::AutoLock auto_lock(lock_);
while (0 == task_count() && !shutdown()) {
++waiting_thread_count_;
work_is_available()->Wait();
--waiting_thread_count_;
}
if (shutdown()) {
thread_shutting_down();
return;
}
work_time = GetAnAssignment(thread_id);
could_use_help = (task_count() > 0) && allow_help_requests();
}
if (could_use_help)
work_is_available()->Signal();
if (work_time > TimeDelta::FromMilliseconds(0)) {
base::AutoLock auto_lock(private_lock);
ConditionVariable private_cv(&private_lock);
private_cv.TimedWait(work_time);
}
{
base::AutoLock auto_lock(lock_);
WorkIsCompleted(thread_id);
}
}
}
}
}