This source file includes following definitions.
- kOneHundredMs
- TEST_F
- TEST_F
- BackInTime
- TEST_F
- TEST_F
- TEST_F
- shutdown_
- GetThreadId
- EveryIdWasAllocated
- GetAnAssignment
- WorkIsCompleted
- task_count
- allow_help_requests
- shutdown
- ThreadSafeCheckShutdown
- thread_shutting_down
- lock
- work_is_available
- all_threads_have_ids
- no_more_tasks
- ResetHistory
- GetMinCompletionsByWorkerThread
- GetMaxCompletionsByWorkerThread
- GetNumThreadsTakingAssignments
- GetNumThreadsCompletingTasks
- GetNumberOfCompletedTasks
- SetWorkTime
- SetTaskCount
- SetAllowHelp
- SetShutdown
- SpinUntilAllThreadsAreWaiting
- SpinUntilTaskCountLessThan
- ThreadMain
#include <time.h>
#include <algorithm>
#include <vector>
#include "base/bind.h"
#include "base/logging.h"
#include "base/memory/scoped_ptr.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/spin_wait.h"
#include "base/threading/platform_thread.h"
#include "base/threading/thread.h"
#include "base/threading/thread_collision_warner.h"
#include "base/time/time.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "testing/platform_test.h"
namespace base {
namespace {
class ConditionVariableTest : public PlatformTest {
 public:
  const TimeDelta kZeroMs;
  const TimeDelta kTenMs;
  const TimeDelta kThirtyMs;
  const TimeDelta kFortyFiveMs;
  const TimeDelta kSixtyMs;
  const TimeDelta kOneHundredMs;
  ConditionVariableTest()
      : kZeroMs(TimeDelta::FromMilliseconds(0)),
        kTenMs(TimeDelta::FromMilliseconds(10)),
        kThirtyMs(TimeDelta::FromMilliseconds(30)),
        kFortyFiveMs(TimeDelta::FromMilliseconds(45)),
        kSixtyMs(TimeDelta::FromMilliseconds(60)),
        kOneHundredMs(TimeDelta::FromMilliseconds(100)) {
  }
};
class WorkQueue : public PlatformThread::Delegate {
 public:
  explicit WorkQueue(int thread_count);
  virtual ~WorkQueue();
  
  virtual void ThreadMain() OVERRIDE;
  
  
  
  int GetThreadId();  
  bool EveryIdWasAllocated() const;  
  TimeDelta GetAnAssignment(int thread_id);  
  void WorkIsCompleted(int thread_id);
  int task_count() const;
  bool allow_help_requests() const;  
  bool shutdown() const;  
  void thread_shutting_down();
  
  
  Lock* lock();
  ConditionVariable* work_is_available();
  ConditionVariable* all_threads_have_ids();
  ConditionVariable* no_more_tasks();
  
  
  
  void ResetHistory();
  int GetMinCompletionsByWorkerThread() const;
  int GetMaxCompletionsByWorkerThread() const;
  int GetNumThreadsTakingAssignments() const;
  int GetNumThreadsCompletingTasks() const;
  int GetNumberOfCompletedTasks() const;
  void SetWorkTime(TimeDelta delay);
  void SetTaskCount(int count);
  void SetAllowHelp(bool allow);
  
  
  void SpinUntilAllThreadsAreWaiting();
  void SpinUntilTaskCountLessThan(int task_count);
  
  void SetShutdown();
  
  
  
  bool ThreadSafeCheckShutdown(int thread_count);
 private:
  
  Lock lock_;
  ConditionVariable work_is_available_;  
  
  ConditionVariable all_threads_have_ids_;  
  ConditionVariable no_more_tasks_;  
  const int thread_count_;
  int waiting_thread_count_;
  scoped_ptr<PlatformThreadHandle[]> thread_handles_;
  std::vector<int> assignment_history_;  
  std::vector<int> completion_history_;  
  int thread_started_counter_;  
  int shutdown_task_count_;  
  int task_count_;  
  TimeDelta worker_delay_;  
  bool allow_help_requests_;  
  bool shutdown_;  
  DFAKE_MUTEX(locked_methods_);
};
TEST_F(ConditionVariableTest, StartupShutdownTest) {
  Lock lock;
  
  {
    ConditionVariable cv1(&lock);
  }  
  
  ConditionVariable cv(&lock);
  lock.Acquire();
  cv.TimedWait(kTenMs);  
  cv.TimedWait(kTenMs);  
  lock.Release();
  lock.Acquire();
  cv.TimedWait(kTenMs);  
  cv.TimedWait(kTenMs);  
  cv.TimedWait(kTenMs);  
  lock.Release();
}  
TEST_F(ConditionVariableTest, TimeoutTest) {
  Lock lock;
  ConditionVariable cv(&lock);
  lock.Acquire();
  TimeTicks start = TimeTicks::Now();
  const TimeDelta WAIT_TIME = TimeDelta::FromMilliseconds(300);
  
  const TimeDelta FUDGE_TIME = TimeDelta::FromMilliseconds(50);
  cv.TimedWait(WAIT_TIME + FUDGE_TIME);
  TimeDelta duration = TimeTicks::Now() - start;
  
  
  EXPECT_TRUE(duration >= WAIT_TIME);
  lock.Release();
}
#if defined(OS_POSIX)
const int kDiscontinuitySeconds = 2;
void BackInTime(Lock* lock) {
  AutoLock auto_lock(*lock);
  timeval tv;
  gettimeofday(&tv, NULL);
  tv.tv_sec -= kDiscontinuitySeconds;
  settimeofday(&tv, NULL);
}
TEST_F(ConditionVariableTest, DISABLED_TimeoutAcrossSetTimeOfDay) {
  timeval tv;
  gettimeofday(&tv, NULL);
  tv.tv_sec += kDiscontinuitySeconds;
  if (settimeofday(&tv, NULL) < 0) {
    PLOG(ERROR) << "Could not set time of day. Run as root?";
    return;
  }
  Lock lock;
  ConditionVariable cv(&lock);
  lock.Acquire();
  Thread thread("Helper");
  thread.Start();
  thread.message_loop()->PostTask(FROM_HERE, base::Bind(&BackInTime, &lock));
  TimeTicks start = TimeTicks::Now();
  const TimeDelta kWaitTime = TimeDelta::FromMilliseconds(300);
  
  const TimeDelta kFudgeTime = TimeDelta::FromMilliseconds(50);
  cv.TimedWait(kWaitTime + kFudgeTime);
  TimeDelta duration = TimeTicks::Now() - start;
  thread.Stop();
  
  
  EXPECT_TRUE(duration >= kWaitTime);
  EXPECT_TRUE(duration <= TimeDelta::FromSeconds(kDiscontinuitySeconds));
  lock.Release();
}
#endif
#if defined(OS_WIN)
#define MAYBE_MultiThreadConsumerTest DISABLED_MultiThreadConsumerTest
#else
#define MAYBE_MultiThreadConsumerTest MultiThreadConsumerTest
#endif
TEST_F(ConditionVariableTest, MAYBE_MultiThreadConsumerTest) {
  const int kThreadCount = 10;
  WorkQueue queue(kThreadCount);  
  const int kTaskCount = 10;  
  Time start_time;  
  {
    base::AutoLock auto_lock(*queue.lock());
    while (!queue.EveryIdWasAllocated())
      queue.all_threads_have_ids()->Wait();
  }
  
  
  queue.SpinUntilAllThreadsAreWaiting();
  {
    
    base::AutoLock auto_lock(*queue.lock());
    EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments());
    EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks());
    EXPECT_EQ(0, queue.task_count());
    EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread());
    EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
    EXPECT_EQ(0, queue.GetNumberOfCompletedTasks());
    
    
    queue.ResetHistory();
    queue.SetTaskCount(kTaskCount);
    queue.SetWorkTime(kThirtyMs);
    queue.SetAllowHelp(true);
    start_time = Time::Now();
  }
  queue.work_is_available()->Signal();  
  
  queue.SpinUntilTaskCountLessThan(kTaskCount);
  
  queue.SpinUntilAllThreadsAreWaiting();
  {
    
    base::AutoLock auto_lock(*queue.lock());
    while (queue.task_count())
      queue.no_more_tasks()->Wait();
    
    
    
    EXPECT_LE(2, queue.GetNumThreadsTakingAssignments());
    EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks());
    
    queue.ResetHistory();
    queue.SetTaskCount(3);
    queue.SetWorkTime(kThirtyMs);
    queue.SetAllowHelp(false);
  }
  queue.work_is_available()->Broadcast();  
  
  queue.SpinUntilTaskCountLessThan(3);
  
  queue.SpinUntilAllThreadsAreWaiting();
  {
    base::AutoLock auto_lock(*queue.lock());
    EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments());
    EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks());
    EXPECT_EQ(0, queue.task_count());
    EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread());
    EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
    EXPECT_EQ(3, queue.GetNumberOfCompletedTasks());
    
    queue.ResetHistory();
    queue.SetTaskCount(3);
    queue.SetWorkTime(kThirtyMs);
    queue.SetAllowHelp(true);  
  }
  queue.work_is_available()->Broadcast();  
  
  queue.SpinUntilTaskCountLessThan(3);
  
  queue.SpinUntilAllThreadsAreWaiting();
  {
    base::AutoLock auto_lock(*queue.lock());
    EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments());
    EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks());
    EXPECT_EQ(0, queue.task_count());
    EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread());
    EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
    EXPECT_EQ(3, queue.GetNumberOfCompletedTasks());
    
    queue.ResetHistory();
    queue.SetTaskCount(20);  
    queue.SetWorkTime(kThirtyMs);
    queue.SetAllowHelp(true);
  }
  queue.work_is_available()->Signal();  
  
  queue.SpinUntilTaskCountLessThan(20);
  
  queue.SpinUntilAllThreadsAreWaiting();  
  {
    base::AutoLock auto_lock(*queue.lock());
    EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments());
    EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks());
    EXPECT_EQ(0, queue.task_count());
    EXPECT_EQ(20, queue.GetNumberOfCompletedTasks());
    
    queue.ResetHistory();
    queue.SetTaskCount(20);  
    queue.SetWorkTime(kThirtyMs);
    queue.SetAllowHelp(true);
  }
  queue.work_is_available()->Broadcast();
  
  queue.SpinUntilTaskCountLessThan(20);
  
  queue.SpinUntilAllThreadsAreWaiting();  
  {
    base::AutoLock auto_lock(*queue.lock());
    EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments());
    EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks());
    EXPECT_EQ(0, queue.task_count());
    EXPECT_EQ(20, queue.GetNumberOfCompletedTasks());
    queue.SetShutdown();
  }
  queue.work_is_available()->Broadcast();  
  SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1),
                                   queue.ThreadSafeCheckShutdown(kThreadCount));
}
TEST_F(ConditionVariableTest, LargeFastTaskTest) {
  const int kThreadCount = 200;
  WorkQueue queue(kThreadCount);  
  Lock private_lock;  
  base::AutoLock private_held_lock(private_lock);
  ConditionVariable private_cv(&private_lock);
  {
    base::AutoLock auto_lock(*queue.lock());
    while (!queue.EveryIdWasAllocated())
      queue.all_threads_have_ids()->Wait();
  }
  
  queue.SpinUntilAllThreadsAreWaiting();
  {
    
    base::AutoLock auto_lock(*queue.lock());
    EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments());
    EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks());
    EXPECT_EQ(0, queue.task_count());
    EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread());
    EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
    EXPECT_EQ(0, queue.GetNumberOfCompletedTasks());
    
    queue.ResetHistory();
    queue.SetTaskCount(20 * kThreadCount);
    queue.SetWorkTime(kFortyFiveMs);
    queue.SetAllowHelp(false);
  }
  queue.work_is_available()->Broadcast();  
  
  {
    base::AutoLock auto_lock(*queue.lock());
    while (queue.task_count() != 0)
      queue.no_more_tasks()->Wait();
  }
  
  queue.SpinUntilAllThreadsAreWaiting();
  {
    
    
    base::AutoLock auto_lock(*queue.lock());
    EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments());
    EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks());
    EXPECT_EQ(0, queue.task_count());
    EXPECT_LE(20, queue.GetMaxCompletionsByWorkerThread());
    EXPECT_EQ(20 * kThreadCount, queue.GetNumberOfCompletedTasks());
    
    queue.ResetHistory();
    queue.SetTaskCount(kThreadCount * 4);
    queue.SetWorkTime(kFortyFiveMs);
    queue.SetAllowHelp(true);  
  }
  queue.work_is_available()->Signal();  
  
  {
    base::AutoLock auto_lock(*queue.lock());
    while (queue.task_count() != 0)
      queue.no_more_tasks()->Wait();
  }
  
  queue.SpinUntilAllThreadsAreWaiting();
  {
    
    
    base::AutoLock auto_lock(*queue.lock());
    EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments());
    EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks());
    EXPECT_EQ(0, queue.task_count());
    EXPECT_LE(4, queue.GetMaxCompletionsByWorkerThread());
    EXPECT_EQ(4 * kThreadCount, queue.GetNumberOfCompletedTasks());
    queue.SetShutdown();
  }
  queue.work_is_available()->Broadcast();  
  
  SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1),
                                   queue.ThreadSafeCheckShutdown(kThreadCount));
}
WorkQueue::WorkQueue(int thread_count)
  : lock_(),
    work_is_available_(&lock_),
    all_threads_have_ids_(&lock_),
    no_more_tasks_(&lock_),
    thread_count_(thread_count),
    waiting_thread_count_(0),
    thread_handles_(new PlatformThreadHandle[thread_count]),
    assignment_history_(thread_count),
    completion_history_(thread_count),
    thread_started_counter_(0),
    shutdown_task_count_(0),
    task_count_(0),
    allow_help_requests_(false),
    shutdown_(false) {
  EXPECT_GE(thread_count_, 1);
  ResetHistory();
  SetTaskCount(0);
  SetWorkTime(TimeDelta::FromMilliseconds(30));
  for (int i = 0; i < thread_count_; ++i) {
    PlatformThreadHandle pth;
    EXPECT_TRUE(PlatformThread::Create(0, this, &pth));
    thread_handles_[i] = pth;
  }
}
WorkQueue::~WorkQueue() {
  {
    base::AutoLock auto_lock(lock_);
    SetShutdown();
  }
  work_is_available_.Broadcast();  
  for (int i = 0; i < thread_count_; ++i) {
    PlatformThread::Join(thread_handles_[i]);
  }
  EXPECT_EQ(0, waiting_thread_count_);
}
int WorkQueue::GetThreadId() {
  DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
  DCHECK(!EveryIdWasAllocated());
  return thread_started_counter_++;  
}
bool WorkQueue::EveryIdWasAllocated() const {
  DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
  return thread_count_ == thread_started_counter_;
}
TimeDelta WorkQueue::GetAnAssignment(int thread_id) {
  DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
  DCHECK_LT(0, task_count_);
  assignment_history_[thread_id]++;
  if (0 == --task_count_) {
    no_more_tasks_.Signal();
  }
  return worker_delay_;
}
void WorkQueue::WorkIsCompleted(int thread_id) {
  DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
  completion_history_[thread_id]++;
}
int WorkQueue::task_count() const {
  DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
  return task_count_;
}
bool WorkQueue::allow_help_requests() const {
  DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
  return allow_help_requests_;
}
bool WorkQueue::shutdown() const {
  lock_.AssertAcquired();
  DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
  return shutdown_;
}
bool WorkQueue::ThreadSafeCheckShutdown(int thread_count) {
  bool all_shutdown;
  base::AutoLock auto_lock(lock_);
  {
    
    DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
    all_shutdown = (shutdown_task_count_ == thread_count);
  }
  return all_shutdown;
}
void WorkQueue::thread_shutting_down() {
  lock_.AssertAcquired();
  DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
  shutdown_task_count_++;
}
Lock* WorkQueue::lock() {
  return &lock_;
}
ConditionVariable* WorkQueue::work_is_available() {
  return &work_is_available_;
}
ConditionVariable* WorkQueue::all_threads_have_ids() {
  return &all_threads_have_ids_;
}
ConditionVariable* WorkQueue::no_more_tasks() {
  return &no_more_tasks_;
}
void WorkQueue::ResetHistory() {
  for (int i = 0; i < thread_count_; ++i) {
    assignment_history_[i] = 0;
    completion_history_[i] = 0;
  }
}
int WorkQueue::GetMinCompletionsByWorkerThread() const {
  int minumum = completion_history_[0];
  for (int i = 0; i < thread_count_; ++i)
    minumum = std::min(minumum, completion_history_[i]);
  return minumum;
}
int WorkQueue::GetMaxCompletionsByWorkerThread() const {
  int maximum = completion_history_[0];
  for (int i = 0; i < thread_count_; ++i)
    maximum = std::max(maximum, completion_history_[i]);
  return maximum;
}
int WorkQueue::GetNumThreadsTakingAssignments() const {
  int count = 0;
  for (int i = 0; i < thread_count_; ++i)
    if (assignment_history_[i])
      count++;
  return count;
}
int WorkQueue::GetNumThreadsCompletingTasks() const {
  int count = 0;
  for (int i = 0; i < thread_count_; ++i)
    if (completion_history_[i])
      count++;
  return count;
}
int WorkQueue::GetNumberOfCompletedTasks() const {
  int total = 0;
  for (int i = 0; i < thread_count_; ++i)
    total += completion_history_[i];
  return total;
}
void WorkQueue::SetWorkTime(TimeDelta delay) {
  worker_delay_ = delay;
}
void WorkQueue::SetTaskCount(int count) {
  task_count_ = count;
}
void WorkQueue::SetAllowHelp(bool allow) {
  allow_help_requests_ = allow;
}
void WorkQueue::SetShutdown() {
  lock_.AssertAcquired();
  shutdown_ = true;
}
void WorkQueue::SpinUntilAllThreadsAreWaiting() {
  while (true) {
    {
      base::AutoLock auto_lock(lock_);
      if (waiting_thread_count_ == thread_count_)
        break;
    }
    PlatformThread::Sleep(TimeDelta::FromMilliseconds(30));
  }
}
void WorkQueue::SpinUntilTaskCountLessThan(int task_count) {
  while (true) {
    {
      base::AutoLock auto_lock(lock_);
      if (task_count_ < task_count)
        break;
    }
    PlatformThread::Sleep(TimeDelta::FromMilliseconds(30));
  }
}
void WorkQueue::ThreadMain() {
  int thread_id;
  {
    base::AutoLock auto_lock(lock_);
    thread_id = GetThreadId();
    if (EveryIdWasAllocated())
      all_threads_have_ids()->Signal();  
  }
  Lock private_lock;  
  while (1) {  
    TimeDelta work_time;
    bool could_use_help;
    {
      base::AutoLock auto_lock(lock_);
      while (0 == task_count() && !shutdown()) {
        ++waiting_thread_count_;
        work_is_available()->Wait();
        --waiting_thread_count_;
      }
      if (shutdown()) {
        
        thread_shutting_down();
        return;  
      }
      
      work_time = GetAnAssignment(thread_id);
      could_use_help = (task_count() > 0) && allow_help_requests();
    }  
    
    if (could_use_help)
      work_is_available()->Signal();  
    if (work_time > TimeDelta::FromMilliseconds(0)) {
      
      
      base::AutoLock auto_lock(private_lock);
      ConditionVariable private_cv(&private_lock);
      private_cv.TimedWait(work_time);  
    }
    {
      base::AutoLock auto_lock(lock_);
      
      WorkIsCompleted(thread_id);
    }
  }
}
}  
}