root/base/threading/sequenced_worker_pool_unittest.cc

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. unblock_counter_
  2. Block
  3. Unblock
  4. started_events_
  5. FastTask
  6. SlowTask
  7. BlockTask
  8. PostAdditionalTasks
  9. WaitUntilTasksBlocked
  10. WaitUntilTasksComplete
  11. GetTasksCompletedCount
  12. ClearCompleteSequence
  13. SignalWorkerDone
  14. SetUp
  15. TearDown
  16. pool
  17. tracker
  18. ResetPool
  19. SetWillWaitForShutdownCallback
  20. EnsureAllWorkersCreated
  21. has_work_call_count
  22. EnsureTasksToCompleteCountAndUnblock
  23. HoldPoolReference
  24. TEST_F
  25. TEST_F
  26. TEST_F
  27. TEST_F
  28. TEST_F
  29. TEST_F
  30. TEST_F
  31. TEST_F
  32. TEST_F
  33. TEST_F
  34. TEST_F
  35. IsRunningOnCurrentThreadTask
  36. TEST_F
  37. TEST_F
  38. TEST
  39. StartTaskRunner
  40. GetTaskRunner
  41. StopTaskRunner
  42. StartTaskRunner
  43. GetTaskRunner
  44. StopTaskRunner
  45. StartTaskRunner
  46. GetTaskRunner
  47. StopTaskRunner

// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/threading/sequenced_worker_pool.h"

#include <algorithm>

#include "base/bind.h"
#include "base/compiler_specific.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/message_loop/message_loop_proxy.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/test/sequenced_task_runner_test_template.h"
#include "base/test/sequenced_worker_pool_owner.h"
#include "base/test/task_runner_test_template.h"
#include "base/test/test_timeouts.h"
#include "base/threading/platform_thread.h"
#include "base/time/time.h"
#include "base/tracked_objects.h"
#include "testing/gtest/include/gtest/gtest.h"

namespace base {

// IMPORTANT NOTE:
//
// Many of these tests have failure modes where they'll hang forever. These
// tests should not be flaky, and hanging indicates a type of failure. Do not
// mark as flaky if they're hanging, it's likely an actual bug.

namespace {

const size_t kNumWorkerThreads = 3;

// Allows a number of threads to all be blocked on the same event, and
// provides a way to unblock a certain number of them.
class ThreadBlocker {
 public:
  ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {}

  void Block() {
    {
      base::AutoLock lock(lock_);
      while (unblock_counter_ == 0)
        cond_var_.Wait();
      unblock_counter_--;
    }
    cond_var_.Signal();
  }

  void Unblock(size_t count) {
    {
      base::AutoLock lock(lock_);
      DCHECK(unblock_counter_ == 0);
      unblock_counter_ = count;
    }
    cond_var_.Signal();
  }

 private:
  base::Lock lock_;
  base::ConditionVariable cond_var_;

  size_t unblock_counter_;
};

class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
 public:
  TestTracker()
      : lock_(),
        cond_var_(&lock_),
        started_events_(0) {
  }

  // Each of these tasks appends the argument to the complete sequence vector
  // so calling code can see what order they finished in.
  void FastTask(int id) {
    SignalWorkerDone(id);
  }

  void SlowTask(int id) {
    base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1));
    SignalWorkerDone(id);
  }

  void BlockTask(int id, ThreadBlocker* blocker) {
    // Note that this task has started and signal anybody waiting for that
    // to happen.
    {
      base::AutoLock lock(lock_);
      started_events_++;
    }
    cond_var_.Signal();

    blocker->Block();
    SignalWorkerDone(id);
  }

  void PostAdditionalTasks(
        int id, SequencedWorkerPool* pool,
        bool expected_return_value) {
    Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100);
    EXPECT_EQ(expected_return_value,
              pool->PostWorkerTaskWithShutdownBehavior(
                  FROM_HERE, fast_task,
                  SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
    EXPECT_EQ(expected_return_value,
              pool->PostWorkerTaskWithShutdownBehavior(
                  FROM_HERE, fast_task,
                  SequencedWorkerPool::SKIP_ON_SHUTDOWN));
    pool->PostWorkerTaskWithShutdownBehavior(
        FROM_HERE, fast_task,
        SequencedWorkerPool::BLOCK_SHUTDOWN);
    SignalWorkerDone(id);
  }

  // Waits until the given number of tasks have started executing.
  void WaitUntilTasksBlocked(size_t count) {
    {
      base::AutoLock lock(lock_);
      while (started_events_ < count)
        cond_var_.Wait();
    }
    cond_var_.Signal();
  }

  // Blocks the current thread until at least the given number of tasks are in
  // the completed vector, and then returns a copy.
  std::vector<int> WaitUntilTasksComplete(size_t num_tasks) {
    std::vector<int> ret;
    {
      base::AutoLock lock(lock_);
      while (complete_sequence_.size() < num_tasks)
        cond_var_.Wait();
      ret = complete_sequence_;
    }
    cond_var_.Signal();
    return ret;
  }

  size_t GetTasksCompletedCount() {
    base::AutoLock lock(lock_);
    return complete_sequence_.size();
  }

  void ClearCompleteSequence() {
    base::AutoLock lock(lock_);
    complete_sequence_.clear();
    started_events_ = 0;
  }

 private:
  friend class base::RefCountedThreadSafe<TestTracker>;
  ~TestTracker() {}

  void SignalWorkerDone(int id) {
    {
      base::AutoLock lock(lock_);
      complete_sequence_.push_back(id);
    }
    cond_var_.Signal();
  }

  // Protects the complete_sequence.
  base::Lock lock_;

  base::ConditionVariable cond_var_;

  // Protected by lock_.
  std::vector<int> complete_sequence_;

  // Counter of the number of "block" workers that have started.
  size_t started_events_;
};

class SequencedWorkerPoolTest : public testing::Test {
 public:
  SequencedWorkerPoolTest()
      : tracker_(new TestTracker) {
    ResetPool();
  }

  virtual ~SequencedWorkerPoolTest() {}

  virtual void SetUp() OVERRIDE {}

  virtual void TearDown() OVERRIDE {
    pool()->Shutdown();
  }

  const scoped_refptr<SequencedWorkerPool>& pool() {
    return pool_owner_->pool();
  }
  TestTracker* tracker() { return tracker_.get(); }

  // Destroys the SequencedWorkerPool instance, blocking until it is fully shut
  // down, and creates a new instance.
  void ResetPool() {
    pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test"));
  }

  void SetWillWaitForShutdownCallback(const Closure& callback) {
    pool_owner_->SetWillWaitForShutdownCallback(callback);
  }

  // Ensures that the given number of worker threads is created by adding
  // tasks and waiting until they complete. Worker thread creation is
  // serialized, can happen on background threads asynchronously, and doesn't
  // happen any more at shutdown. This means that if a test posts a bunch of
  // tasks and calls shutdown, fewer workers will be created than the test may
  // expect.
  //
  // This function ensures that this condition can't happen so tests can make
  // assumptions about the number of workers active. See the comment in
  // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
  // details.
  //
  // It will post tasks to the queue with id -1. It also assumes this is the
  // first thing called in a test since it will clear the complete_sequence_.
  void EnsureAllWorkersCreated() {
    // Create a bunch of threads, all waiting. This will cause that may
    // workers to be created.
    ThreadBlocker blocker;
    for (size_t i = 0; i < kNumWorkerThreads; i++) {
      pool()->PostWorkerTask(FROM_HERE,
                             base::Bind(&TestTracker::BlockTask,
                                        tracker(), -1, &blocker));
    }
    tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);

    // Now wake them up and wait until they're done.
    blocker.Unblock(kNumWorkerThreads);
    tracker()->WaitUntilTasksComplete(kNumWorkerThreads);

    // Clean up the task IDs we added.
    tracker()->ClearCompleteSequence();
  }

  int has_work_call_count() const {
    return pool_owner_->has_work_call_count();
  }

 private:
  MessageLoop message_loop_;
  scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
  const scoped_refptr<TestTracker> tracker_;
};

// Checks that the given number of entries are in the tasks to complete of
// the given tracker, and then signals the given event the given number of
// times. This is used to wakt up blocked background threads before blocking
// on shutdown.
void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker,
                                          size_t expected_tasks_to_complete,
                                          ThreadBlocker* blocker,
                                          size_t threads_to_awake) {
  EXPECT_EQ(
      expected_tasks_to_complete,
      tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size());

  blocker->Unblock(threads_to_awake);
}

class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> {
 public:
  explicit DeletionHelper(
      const scoped_refptr<base::RefCountedData<bool> >& deleted_flag)
      : deleted_flag_(deleted_flag) {
  }

 private:
  friend class base::RefCountedThreadSafe<DeletionHelper>;
  virtual ~DeletionHelper() { deleted_flag_->data = true; }

  const scoped_refptr<base::RefCountedData<bool> > deleted_flag_;
  DISALLOW_COPY_AND_ASSIGN(DeletionHelper);
};

void HoldPoolReference(const scoped_refptr<base::SequencedWorkerPool>& pool,
                       const scoped_refptr<DeletionHelper>& helper) {
  ADD_FAILURE() << "Should never run";
}

// Tests that delayed tasks are deleted upon shutdown of the pool.
TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) {
  // Post something to verify the pool is started up.
  EXPECT_TRUE(pool()->PostTask(
      FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1)));

  scoped_refptr<base::RefCountedData<bool> > deleted_flag(
      new base::RefCountedData<bool>(false));

  base::Time posted_at(base::Time::Now());
  // Post something that shouldn't run.
  EXPECT_TRUE(pool()->PostDelayedTask(
      FROM_HERE,
      base::Bind(&HoldPoolReference,
                 pool(),
                 make_scoped_refptr(new DeletionHelper(deleted_flag))),
      TestTimeouts::action_timeout()));

  std::vector<int> completion_sequence = tracker()->WaitUntilTasksComplete(1);
  ASSERT_EQ(1u, completion_sequence.size());
  ASSERT_EQ(1, completion_sequence[0]);

  pool()->Shutdown();
  // Shutdown is asynchronous, so use ResetPool() to block until the pool is
  // fully destroyed (and thus shut down).
  ResetPool();

  // Verify that we didn't block until the task was due.
  ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout());

  // Verify that the deferred task has not only not run, but has also been
  // destroyed.
  ASSERT_TRUE(deleted_flag->data);
}

// Tests that same-named tokens have the same ID.
TEST_F(SequencedWorkerPoolTest, NamedTokens) {
  const std::string name1("hello");
  SequencedWorkerPool::SequenceToken token1 =
      pool()->GetNamedSequenceToken(name1);

  SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();

  const std::string name3("goodbye");
  SequencedWorkerPool::SequenceToken token3 =
      pool()->GetNamedSequenceToken(name3);

  // All 3 tokens should be different.
  EXPECT_FALSE(token1.Equals(token2));
  EXPECT_FALSE(token1.Equals(token3));
  EXPECT_FALSE(token2.Equals(token3));

  // Requesting the same name again should give the same value.
  SequencedWorkerPool::SequenceToken token1again =
      pool()->GetNamedSequenceToken(name1);
  EXPECT_TRUE(token1.Equals(token1again));

  SequencedWorkerPool::SequenceToken token3again =
      pool()->GetNamedSequenceToken(name3);
  EXPECT_TRUE(token3.Equals(token3again));
}

// Tests that posting a bunch of tasks (many more than the number of worker
// threads) runs them all.
TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
  pool()->PostWorkerTask(FROM_HERE,
                         base::Bind(&TestTracker::SlowTask, tracker(), 0));

  const size_t kNumTasks = 20;
  for (size_t i = 1; i < kNumTasks; i++) {
    pool()->PostWorkerTask(FROM_HERE,
                           base::Bind(&TestTracker::FastTask, tracker(), i));
  }

  std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
  EXPECT_EQ(kNumTasks, result.size());
}

// Tests that posting a bunch of tasks (many more than the number of
// worker threads) to two pools simultaneously runs them all twice.
// This test is meant to shake out any concurrency issues between
// pools (like histograms).
TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) {
  SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1");
  SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2");

  base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0);
  pool1.pool()->PostWorkerTask(FROM_HERE, slow_task);
  pool2.pool()->PostWorkerTask(FROM_HERE, slow_task);

  const size_t kNumTasks = 20;
  for (size_t i = 1; i < kNumTasks; i++) {
    base::Closure fast_task =
        base::Bind(&TestTracker::FastTask, tracker(), i);
    pool1.pool()->PostWorkerTask(FROM_HERE, fast_task);
    pool2.pool()->PostWorkerTask(FROM_HERE, fast_task);
  }

  std::vector<int> result =
      tracker()->WaitUntilTasksComplete(2*kNumTasks);
  EXPECT_EQ(2 * kNumTasks, result.size());

  pool2.pool()->Shutdown();
  pool1.pool()->Shutdown();
}

// Test that tasks with the same sequence token are executed in order but don't
// affect other tasks.
TEST_F(SequencedWorkerPoolTest, Sequence) {
  // Fill all the worker threads except one.
  const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
  ThreadBlocker background_blocker;
  for (size_t i = 0; i < kNumBackgroundTasks; i++) {
    pool()->PostWorkerTask(FROM_HERE,
                           base::Bind(&TestTracker::BlockTask,
                                      tracker(), i, &background_blocker));
  }
  tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks);

  // Create two tasks with the same sequence token, one that will block on the
  // event, and one which will just complete quickly when it's run. Since there
  // is one worker thread free, the first task will start and then block, and
  // the second task should be waiting.
  ThreadBlocker blocker;
  SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
  pool()->PostSequencedWorkerTask(
      token1, FROM_HERE,
      base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker));
  pool()->PostSequencedWorkerTask(
      token1, FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 101));
  EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());

  // Create another two tasks as above with a different token. These will be
  // blocked since there are no slots to run.
  SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
  pool()->PostSequencedWorkerTask(
      token2, FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 200));
  pool()->PostSequencedWorkerTask(
      token2, FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 201));
  EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());

  // Let one background task complete. This should then let both tasks of
  // token2 run to completion in order. The second task of token1 should still
  // be blocked.
  background_blocker.Unblock(1);
  std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
  ASSERT_EQ(3u, result.size());
  EXPECT_EQ(200, result[1]);
  EXPECT_EQ(201, result[2]);

  // Finish the rest of the background tasks. This should leave some workers
  // free with the second token1 task still blocked on the first.
  background_blocker.Unblock(kNumBackgroundTasks - 1);
  EXPECT_EQ(kNumBackgroundTasks + 2,
            tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size());

  // Allow the first task of token1 to complete. This should run the second.
  blocker.Unblock(1);
  result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4);
  ASSERT_EQ(kNumBackgroundTasks + 4, result.size());
  EXPECT_EQ(100, result[result.size() - 2]);
  EXPECT_EQ(101, result[result.size() - 1]);
}

// Tests that any tasks posted after Shutdown are ignored.
// Disabled for flakiness.  See http://crbug.com/166451.
TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) {
  // Start tasks to take all the threads and block them.
  EnsureAllWorkersCreated();
  ThreadBlocker blocker;
  for (size_t i = 0; i < kNumWorkerThreads; i++) {
    pool()->PostWorkerTask(FROM_HERE,
                           base::Bind(&TestTracker::BlockTask,
                                      tracker(), i, &blocker));
  }
  tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);

  SetWillWaitForShutdownCallback(
      base::Bind(&EnsureTasksToCompleteCountAndUnblock,
                 scoped_refptr<TestTracker>(tracker()), 0,
                 &blocker, kNumWorkerThreads));

  // Shutdown the worker pool. This should discard all non-blocking tasks.
  const int kMaxNewBlockingTasksAfterShutdown = 100;
  pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown);

  int old_has_work_call_count = has_work_call_count();

  std::vector<int> result =
      tracker()->WaitUntilTasksComplete(kNumWorkerThreads);

  // The kNumWorkerThread items should have completed, in no particular order.
  ASSERT_EQ(kNumWorkerThreads, result.size());
  for (size_t i = 0; i < kNumWorkerThreads; i++) {
    EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
                result.end());
  }

  // No further tasks, regardless of shutdown mode, should be allowed.
  EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
      FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 100),
      SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
  EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
      FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 101),
      SequencedWorkerPool::SKIP_ON_SHUTDOWN));
  EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
      FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 102),
      SequencedWorkerPool::BLOCK_SHUTDOWN));

  ASSERT_EQ(old_has_work_call_count, has_work_call_count());
}

TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) {
  // Test that <n> new blocking tasks are allowed provided they're posted
  // by a running tasks.
  EnsureAllWorkersCreated();
  ThreadBlocker blocker;

  // Start tasks to take all the threads and block them.
  const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
  for (int i = 0; i < kNumBlockTasks; ++i) {
    EXPECT_TRUE(pool()->PostWorkerTask(
        FROM_HERE,
        base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
  }
  tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);

  // Queue up shutdown blocking tasks behind those which will attempt to post
  // additional tasks when run, PostAdditionalTasks attemtps to post 3
  // new FastTasks, one for each shutdown_behavior.
  const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads);
  for (int i = 0; i < kNumQueuedTasks; ++i) {
    EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior(
        FROM_HERE,
        base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool(),
                   false),
        SequencedWorkerPool::BLOCK_SHUTDOWN));
  }

  // Setup to open the floodgates from within Shutdown().
  SetWillWaitForShutdownCallback(
      base::Bind(&EnsureTasksToCompleteCountAndUnblock,
                 scoped_refptr<TestTracker>(tracker()),
                 0, &blocker, kNumBlockTasks));

  // Allow half of the additional blocking tasks thru.
  const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2;
  pool()->Shutdown(kNumNewBlockingTasksToAllow);

  // Ensure that the correct number of tasks actually got run.
  tracker()->WaitUntilTasksComplete(static_cast<size_t>(
      kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow));

  // Clean up the task IDs we added and go home.
  tracker()->ClearCompleteSequence();
}

// Tests that unrun tasks are discarded properly according to their shutdown
// mode.
TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
  // Start tasks to take all the threads and block them.
  EnsureAllWorkersCreated();
  ThreadBlocker blocker;
  for (size_t i = 0; i < kNumWorkerThreads; i++) {
    pool()->PostWorkerTask(FROM_HERE,
                           base::Bind(&TestTracker::BlockTask,
                                      tracker(), i, &blocker));
  }
  tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);

  // Create some tasks with different shutdown modes.
  pool()->PostWorkerTaskWithShutdownBehavior(
      FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 100),
      SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
  pool()->PostWorkerTaskWithShutdownBehavior(
      FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 101),
      SequencedWorkerPool::SKIP_ON_SHUTDOWN);
  pool()->PostWorkerTaskWithShutdownBehavior(
      FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 102),
      SequencedWorkerPool::BLOCK_SHUTDOWN);

  // Shutdown the worker pool. This should discard all non-blocking tasks.
  SetWillWaitForShutdownCallback(
      base::Bind(&EnsureTasksToCompleteCountAndUnblock,
                 scoped_refptr<TestTracker>(tracker()), 0,
                 &blocker, kNumWorkerThreads));
  pool()->Shutdown();

  std::vector<int> result =
      tracker()->WaitUntilTasksComplete(kNumWorkerThreads + 1);

  // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
  // one, in no particular order.
  ASSERT_EQ(kNumWorkerThreads + 1, result.size());
  for (size_t i = 0; i < kNumWorkerThreads; i++) {
    EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
                result.end());
  }
  EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end());
}

// Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
  scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior(
      SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
  scoped_refptr<SequencedTaskRunner> sequenced_runner(
      pool()->GetSequencedTaskRunnerWithShutdownBehavior(
          pool()->GetSequenceToken(),
          SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
  EnsureAllWorkersCreated();
  ThreadBlocker blocker;
  pool()->PostWorkerTaskWithShutdownBehavior(
      FROM_HERE,
      base::Bind(&TestTracker::BlockTask,
                 tracker(), 0, &blocker),
      SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
  runner->PostTask(
      FROM_HERE,
      base::Bind(&TestTracker::BlockTask,
                 tracker(), 1, &blocker));
  sequenced_runner->PostTask(
      FROM_HERE,
      base::Bind(&TestTracker::BlockTask,
                 tracker(), 2, &blocker));

  tracker()->WaitUntilTasksBlocked(3);

  // This should not block. If this test hangs, it means it failed.
  pool()->Shutdown();

  // The task should not have completed yet.
  EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());

  // Posting more tasks should fail.
  EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
      FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
      SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
  EXPECT_FALSE(runner->PostTask(
      FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
  EXPECT_FALSE(sequenced_runner->PostTask(
      FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));

  // Continue the background thread and make sure the tasks can complete.
  blocker.Unblock(3);
  std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
  EXPECT_EQ(3u, result.size());
}

// Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown
// until they stop, but tasks not yet started do not.
TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) {
  // Start tasks to take all the threads and block them.
  EnsureAllWorkersCreated();
  ThreadBlocker blocker;

  // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not
  // return until these tasks have completed.
  for (size_t i = 0; i < kNumWorkerThreads; i++) {
    pool()->PostWorkerTaskWithShutdownBehavior(
        FROM_HERE,
        base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker),
        SequencedWorkerPool::SKIP_ON_SHUTDOWN);
  }
  tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);

  // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be
  // executed once Shutdown() has been called.
  pool()->PostWorkerTaskWithShutdownBehavior(
      FROM_HERE,
      base::Bind(&TestTracker::BlockTask,
                 tracker(), 0, &blocker),
      SequencedWorkerPool::SKIP_ON_SHUTDOWN);

  // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have
  // been started block shutdown.
  SetWillWaitForShutdownCallback(
      base::Bind(&EnsureTasksToCompleteCountAndUnblock,
                 scoped_refptr<TestTracker>(tracker()), 0,
                 &blocker, kNumWorkerThreads));

  // No tasks should have completed yet.
  EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());

  // This should not block. If this test hangs, it means it failed.
  pool()->Shutdown();

  // Shutdown should not return until all of the tasks have completed.
  std::vector<int> result =
      tracker()->WaitUntilTasksComplete(kNumWorkerThreads);

  // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be
  // allowed to complete. No additional non-blocking tasks should have been
  // started.
  ASSERT_EQ(kNumWorkerThreads, result.size());
  for (size_t i = 0; i < kNumWorkerThreads; i++) {
    EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
                result.end());
  }
}

// Ensure all worker threads are created, and then trigger a spurious
// work signal. This shouldn't cause any other work signals to be
// triggered. This is a regression test for http://crbug.com/117469.
TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) {
  EnsureAllWorkersCreated();
  int old_has_work_call_count = has_work_call_count();
  pool()->SignalHasWorkForTesting();
  // This is inherently racy, but can only produce false positives.
  base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
  EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count());
}

void IsRunningOnCurrentThreadTask(
    SequencedWorkerPool::SequenceToken test_positive_token,
    SequencedWorkerPool::SequenceToken test_negative_token,
    SequencedWorkerPool* pool,
    SequencedWorkerPool* unused_pool) {
  EXPECT_TRUE(pool->RunsTasksOnCurrentThread());
  EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token));
  EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token));
  EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
  EXPECT_FALSE(
      unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token));
  EXPECT_FALSE(
      unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token));
}

// Verify correctness of the IsRunningSequenceOnCurrentThread method.
TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) {
  SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
  SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
  SequencedWorkerPool::SequenceToken unsequenced_token;

  scoped_refptr<SequencedWorkerPool> unused_pool =
      new SequencedWorkerPool(2, "unused_pool");

  EXPECT_FALSE(pool()->RunsTasksOnCurrentThread());
  EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1));
  EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2));
  EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token));
  EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
  EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token1));
  EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token2));
  EXPECT_FALSE(
      unused_pool->IsRunningSequenceOnCurrentThread(unsequenced_token));

  pool()->PostSequencedWorkerTask(
      token1, FROM_HERE,
      base::Bind(&IsRunningOnCurrentThreadTask,
                 token1, token2, pool(), unused_pool));
  pool()->PostSequencedWorkerTask(
      token2, FROM_HERE,
      base::Bind(&IsRunningOnCurrentThreadTask,
                 token2, unsequenced_token, pool(), unused_pool));
  pool()->PostWorkerTask(
      FROM_HERE,
      base::Bind(&IsRunningOnCurrentThreadTask,
                 unsequenced_token, token1, pool(), unused_pool));
  pool()->Shutdown();
  unused_pool->Shutdown();
}

// Verify that FlushForTesting works as intended.
TEST_F(SequencedWorkerPoolTest, FlushForTesting) {
  // Should be fine to call on a new instance.
  pool()->FlushForTesting();

  // Queue up a bunch of work, including  a long delayed task and
  // a task that produces additional tasks as an artifact.
  pool()->PostDelayedWorkerTask(
      FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 0),
      TimeDelta::FromMinutes(5));
  pool()->PostWorkerTask(FROM_HERE,
                         base::Bind(&TestTracker::SlowTask, tracker(), 0));
  const size_t kNumFastTasks = 20;
  for (size_t i = 0; i < kNumFastTasks; i++) {
    pool()->PostWorkerTask(FROM_HERE,
                           base::Bind(&TestTracker::FastTask, tracker(), 0));
  }
  pool()->PostWorkerTask(
      FROM_HERE,
      base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, pool(),
                 true));

  // We expect all except the delayed task to have been run. We verify all
  // closures have been deleted by looking at the refcount of the
  // tracker.
  EXPECT_FALSE(tracker()->HasOneRef());
  pool()->FlushForTesting();
  EXPECT_TRUE(tracker()->HasOneRef());
  EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount());

  // Should be fine to call on an idle instance with all threads created, and
  // spamming the method shouldn't deadlock or confuse the class.
  pool()->FlushForTesting();
  pool()->FlushForTesting();

  // Should be fine to call after shutdown too.
  pool()->Shutdown();
  pool()->FlushForTesting();
}

TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) {
  MessageLoop loop;
  scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool"));
  scoped_refptr<SequencedTaskRunner> task_runner =
      pool->GetSequencedTaskRunnerWithShutdownBehavior(
          pool->GetSequenceToken(),
          base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);

  // Upon test exit, should shut down without hanging.
  pool->Shutdown();
}

class SequencedWorkerPoolTaskRunnerTestDelegate {
 public:
  SequencedWorkerPoolTaskRunnerTestDelegate() {}

  ~SequencedWorkerPoolTaskRunnerTestDelegate() {}

  void StartTaskRunner() {
    pool_owner_.reset(
        new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
  }

  scoped_refptr<SequencedWorkerPool> GetTaskRunner() {
    return pool_owner_->pool();
  }

  void StopTaskRunner() {
    // Make sure all tasks are run before shutting down. Delayed tasks are
    // not run, they're simply deleted.
    pool_owner_->pool()->FlushForTesting();
    pool_owner_->pool()->Shutdown();
    // Don't reset |pool_owner_| here, as the test may still hold a
    // reference to the pool.
  }

 private:
  MessageLoop message_loop_;
  scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
};

INSTANTIATE_TYPED_TEST_CASE_P(
    SequencedWorkerPool, TaskRunnerTest,
    SequencedWorkerPoolTaskRunnerTestDelegate);

class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate {
 public:
  SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {}

  ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {
  }

  void StartTaskRunner() {
    pool_owner_.reset(
        new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
    task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior(
        SequencedWorkerPool::BLOCK_SHUTDOWN);
  }

  scoped_refptr<TaskRunner> GetTaskRunner() {
    return task_runner_;
  }

  void StopTaskRunner() {
    // Make sure all tasks are run before shutting down. Delayed tasks are
    // not run, they're simply deleted.
    pool_owner_->pool()->FlushForTesting();
    pool_owner_->pool()->Shutdown();
    // Don't reset |pool_owner_| here, as the test may still hold a
    // reference to the pool.
  }

 private:
  MessageLoop message_loop_;
  scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
  scoped_refptr<TaskRunner> task_runner_;
};

INSTANTIATE_TYPED_TEST_CASE_P(
    SequencedWorkerPoolTaskRunner, TaskRunnerTest,
    SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate);

class SequencedWorkerPoolSequencedTaskRunnerTestDelegate {
 public:
  SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {}

  ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {
  }

  void StartTaskRunner() {
    pool_owner_.reset(new SequencedWorkerPoolOwner(
        10, "SequencedWorkerPoolSequencedTaskRunnerTest"));
    task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner(
        pool_owner_->pool()->GetSequenceToken());
  }

  scoped_refptr<SequencedTaskRunner> GetTaskRunner() {
    return task_runner_;
  }

  void StopTaskRunner() {
    // Make sure all tasks are run before shutting down. Delayed tasks are
    // not run, they're simply deleted.
    pool_owner_->pool()->FlushForTesting();
    pool_owner_->pool()->Shutdown();
    // Don't reset |pool_owner_| here, as the test may still hold a
    // reference to the pool.
  }

 private:
  MessageLoop message_loop_;
  scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
  scoped_refptr<SequencedTaskRunner> task_runner_;
};

INSTANTIATE_TYPED_TEST_CASE_P(
    SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest,
    SequencedWorkerPoolSequencedTaskRunnerTestDelegate);

INSTANTIATE_TYPED_TEST_CASE_P(
    SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest,
    SequencedWorkerPoolSequencedTaskRunnerTestDelegate);

}  // namespace

}  // namespace base

/* [<][>][^][v][top][bottom][index][help] */