This source file includes following definitions.
- shutdown_behavior_
- PostDelayedTask
- RunsTasksOnCurrentThread
- shutdown_behavior_
- SequencedWorkerPoolSequencedTaskRunner
- PostDelayedTask
- RunsTasksOnCurrentThread
- PostNonNestableDelayedTask
- GetTaskTraceID
- set_running_task_info
- running_sequence
- running_shutdown_behavior
- running_shutdown_behavior_
- Run
- testing_observer_
- GetSequenceToken
- GetNamedSequenceToken
- PostTask
- RunsTasksOnCurrentThread
- IsRunningSequenceOnCurrentThread
- CleanupForTesting
- SignalHasWorkForTesting
- Shutdown
- IsShutdownInProgress
- ThreadLoop
- HandleCleanup
- LockedGetNamedTokenID
- LockedGetNextSequenceTaskNumber
- LockedCurrentThreadShutdownBehavior
- WillRunWorkerTask
- DidRunWorkerTask
- IsSequenceTokenRunnable
- PrepareToStartAdditionalThreadIfHelpful
- FinishStartingAdditionalThread
- SignalHasWork
- CanShutdown
- GetSequenceTokenForCurrentThread
- inner_
- inner_
- OnDestruct
- GetSequenceToken
- GetNamedSequenceToken
- GetSequencedTaskRunner
- GetSequencedTaskRunnerWithShutdownBehavior
- GetTaskRunnerWithShutdownBehavior
- PostWorkerTask
- PostDelayedWorkerTask
- PostWorkerTaskWithShutdownBehavior
- PostSequencedWorkerTask
- PostDelayedSequencedWorkerTask
- PostNamedSequencedWorkerTask
- PostSequencedWorkerTaskWithShutdownBehavior
- PostDelayedTask
- RunsTasksOnCurrentThread
- IsRunningSequenceOnCurrentThread
- FlushForTesting
- SignalHasWorkForTesting
- Shutdown
- IsShutdownInProgress
#include "base/threading/sequenced_worker_pool.h"
#include <list>
#include <map>
#include <set>
#include <utility>
#include <vector>
#include "base/atomic_sequence_num.h"
#include "base/callback.h"
#include "base/compiler_specific.h"
#include "base/critical_closure.h"
#include "base/debug/trace_event.h"
#include "base/lazy_instance.h"
#include "base/logging.h"
#include "base/memory/linked_ptr.h"
#include "base/message_loop/message_loop_proxy.h"
#include "base/stl_util.h"
#include "base/strings/stringprintf.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/threading/platform_thread.h"
#include "base/threading/simple_thread.h"
#include "base/threading/thread_local.h"
#include "base/threading/thread_restrictions.h"
#include "base/time/time.h"
#include "base/tracked_objects.h"
#if defined(OS_MACOSX)
#include "base/mac/scoped_nsautorelease_pool.h"
#endif
#if !defined(OS_NACL)
#include "base/metrics/histogram.h"
#endif
namespace base {
namespace {
struct SequencedTask : public TrackingInfo {
SequencedTask()
: sequence_token_id(0),
trace_id(0),
sequence_task_number(0),
shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
explicit SequencedTask(const tracked_objects::Location& from_here)
: base::TrackingInfo(from_here, TimeTicks()),
sequence_token_id(0),
trace_id(0),
sequence_task_number(0),
shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
~SequencedTask() {}
int sequence_token_id;
int trace_id;
int64 sequence_task_number;
SequencedWorkerPool::WorkerShutdown shutdown_behavior;
tracked_objects::Location posted_from;
Closure task;
TimeTicks time_to_run;
};
struct SequencedTaskLessThan {
public:
bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const {
if (lhs.time_to_run < rhs.time_to_run)
return true;
if (lhs.time_to_run > rhs.time_to_run)
return false;
return lhs.sequence_task_number < rhs.sequence_task_number;
}
};
class SequencedWorkerPoolTaskRunner : public TaskRunner {
public:
SequencedWorkerPoolTaskRunner(
const scoped_refptr<SequencedWorkerPool>& pool,
SequencedWorkerPool::WorkerShutdown shutdown_behavior);
virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay) OVERRIDE;
virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
private:
virtual ~SequencedWorkerPoolTaskRunner();
const scoped_refptr<SequencedWorkerPool> pool_;
const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner);
};
SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
const scoped_refptr<SequencedWorkerPool>& pool,
SequencedWorkerPool::WorkerShutdown shutdown_behavior)
: pool_(pool),
shutdown_behavior_(shutdown_behavior) {
}
SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
}
bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay) {
if (delay == TimeDelta()) {
return pool_->PostWorkerTaskWithShutdownBehavior(
from_here, task, shutdown_behavior_);
}
return pool_->PostDelayedWorkerTask(from_here, task, delay);
}
bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
return pool_->RunsTasksOnCurrentThread();
}
class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner {
public:
SequencedWorkerPoolSequencedTaskRunner(
const scoped_refptr<SequencedWorkerPool>& pool,
SequencedWorkerPool::SequenceToken token,
SequencedWorkerPool::WorkerShutdown shutdown_behavior);
virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay) OVERRIDE;
virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
virtual bool PostNonNestableDelayedTask(
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay) OVERRIDE;
private:
virtual ~SequencedWorkerPoolSequencedTaskRunner();
const scoped_refptr<SequencedWorkerPool> pool_;
const SequencedWorkerPool::SequenceToken token_;
const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner);
};
SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner(
const scoped_refptr<SequencedWorkerPool>& pool,
SequencedWorkerPool::SequenceToken token,
SequencedWorkerPool::WorkerShutdown shutdown_behavior)
: pool_(pool),
token_(token),
shutdown_behavior_(shutdown_behavior) {
}
SequencedWorkerPoolSequencedTaskRunner::
~SequencedWorkerPoolSequencedTaskRunner() {
}
bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay) {
if (delay == TimeDelta()) {
return pool_->PostSequencedWorkerTaskWithShutdownBehavior(
token_, from_here, task, shutdown_behavior_);
}
return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay);
}
bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const {
return pool_->IsRunningSequenceOnCurrentThread(token_);
}
bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask(
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay) {
return PostDelayedTask(from_here, task, delay);
}
uint64 GetTaskTraceID(const SequencedTask& task,
void* pool) {
return (static_cast<uint64>(task.trace_id) << 32) |
static_cast<uint64>(reinterpret_cast<intptr_t>(pool));
}
base::LazyInstance<base::ThreadLocalPointer<
SequencedWorkerPool::SequenceToken> >::Leaky g_lazy_tls_ptr =
LAZY_INSTANCE_INITIALIZER;
}
class SequencedWorkerPool::Worker : public SimpleThread {
public:
Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
int thread_number,
const std::string& thread_name_prefix);
virtual ~Worker();
virtual void Run() OVERRIDE;
void set_running_task_info(SequenceToken token,
WorkerShutdown shutdown_behavior) {
running_sequence_ = token;
running_shutdown_behavior_ = shutdown_behavior;
}
SequenceToken running_sequence() const {
return running_sequence_;
}
WorkerShutdown running_shutdown_behavior() const {
return running_shutdown_behavior_;
}
private:
scoped_refptr<SequencedWorkerPool> worker_pool_;
SequenceToken running_sequence_;
WorkerShutdown running_shutdown_behavior_;
DISALLOW_COPY_AND_ASSIGN(Worker);
};
class SequencedWorkerPool::Inner {
public:
Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
const std::string& thread_name_prefix,
TestingObserver* observer);
~Inner();
SequenceToken GetSequenceToken();
SequenceToken GetNamedSequenceToken(const std::string& name);
bool PostTask(const std::string* optional_token_name,
SequenceToken sequence_token,
WorkerShutdown shutdown_behavior,
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay);
bool RunsTasksOnCurrentThread() const;
bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
void CleanupForTesting();
void SignalHasWorkForTesting();
int GetWorkSignalCountForTesting() const;
void Shutdown(int max_blocking_tasks_after_shutdown);
bool IsShutdownInProgress();
void ThreadLoop(Worker* this_worker);
private:
enum GetWorkStatus {
GET_WORK_FOUND,
GET_WORK_NOT_FOUND,
GET_WORK_WAIT,
};
enum CleanupState {
CLEANUP_REQUESTED,
CLEANUP_STARTING,
CLEANUP_RUNNING,
CLEANUP_FINISHING,
CLEANUP_DONE,
};
int LockedGetNamedTokenID(const std::string& name);
int64 LockedGetNextSequenceTaskNumber();
WorkerShutdown LockedCurrentThreadShutdownBehavior() const;
GetWorkStatus GetWork(SequencedTask* task,
TimeDelta* wait_time,
std::vector<Closure>* delete_these_outside_lock);
void HandleCleanup();
int WillRunWorkerTask(const SequencedTask& task);
void DidRunWorkerTask(const SequencedTask& task);
bool IsSequenceTokenRunnable(int sequence_token_id) const;
int PrepareToStartAdditionalThreadIfHelpful();
void FinishStartingAdditionalThread(int thread_number);
void SignalHasWork();
bool CanShutdown() const;
SequencedWorkerPool* const worker_pool_;
static base::StaticAtomicSequenceNumber g_last_sequence_number_;
mutable Lock lock_;
ConditionVariable has_work_cv_;
ConditionVariable can_shutdown_cv_;
const size_t max_threads_;
const std::string thread_name_prefix_;
std::map<std::string, int> named_sequence_tokens_;
typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap;
ThreadMap threads_;
bool thread_being_created_;
size_t waiting_thread_count_;
size_t blocking_shutdown_thread_count_;
typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet;
PendingTaskSet pending_tasks_;
int64 next_sequence_task_number_;
size_t blocking_shutdown_pending_task_count_;
std::set<int> current_sequences_;
int trace_id_;
bool shutdown_called_;
int max_blocking_tasks_after_shutdown_;
CleanupState cleanup_state_;
size_t cleanup_idlers_;
ConditionVariable cleanup_cv_;
TestingObserver* const testing_observer_;
DISALLOW_COPY_AND_ASSIGN(Inner);
};
SequencedWorkerPool::Worker::Worker(
const scoped_refptr<SequencedWorkerPool>& worker_pool,
int thread_number,
const std::string& prefix)
: SimpleThread(
prefix + StringPrintf("Worker%d", thread_number).c_str()),
worker_pool_(worker_pool),
running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) {
Start();
}
SequencedWorkerPool::Worker::~Worker() {
}
void SequencedWorkerPool::Worker::Run() {
g_lazy_tls_ptr.Get().Set(&running_sequence_);
worker_pool_->inner_->ThreadLoop(this);
worker_pool_ = NULL;
}
SequencedWorkerPool::Inner::Inner(
SequencedWorkerPool* worker_pool,
size_t max_threads,
const std::string& thread_name_prefix,
TestingObserver* observer)
: worker_pool_(worker_pool),
lock_(),
has_work_cv_(&lock_),
can_shutdown_cv_(&lock_),
max_threads_(max_threads),
thread_name_prefix_(thread_name_prefix),
thread_being_created_(false),
waiting_thread_count_(0),
blocking_shutdown_thread_count_(0),
next_sequence_task_number_(0),
blocking_shutdown_pending_task_count_(0),
trace_id_(0),
shutdown_called_(false),
max_blocking_tasks_after_shutdown_(0),
cleanup_state_(CLEANUP_DONE),
cleanup_idlers_(0),
cleanup_cv_(&lock_),
testing_observer_(observer) {}
SequencedWorkerPool::Inner::~Inner() {
DCHECK(shutdown_called_);
for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
it->second->Join();
threads_.clear();
if (testing_observer_)
testing_observer_->OnDestruct();
}
SequencedWorkerPool::SequenceToken
SequencedWorkerPool::Inner::GetSequenceToken() {
return SequenceToken(g_last_sequence_number_.GetNext() + 1);
}
SequencedWorkerPool::SequenceToken
SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
AutoLock lock(lock_);
return SequenceToken(LockedGetNamedTokenID(name));
}
bool SequencedWorkerPool::Inner::PostTask(
const std::string* optional_token_name,
SequenceToken sequence_token,
WorkerShutdown shutdown_behavior,
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay) {
DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN);
SequencedTask sequenced(from_here);
sequenced.sequence_token_id = sequence_token.id_;
sequenced.shutdown_behavior = shutdown_behavior;
sequenced.posted_from = from_here;
sequenced.task =
shutdown_behavior == BLOCK_SHUTDOWN ?
base::MakeCriticalClosure(task) : task;
sequenced.time_to_run = TimeTicks::Now() + delay;
int create_thread_id = 0;
{
AutoLock lock(lock_);
if (shutdown_called_) {
if (shutdown_behavior != BLOCK_SHUTDOWN ||
LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) {
return false;
}
if (max_blocking_tasks_after_shutdown_ <= 0) {
DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed";
return false;
}
max_blocking_tasks_after_shutdown_ -= 1;
}
sequenced.trace_id = trace_id_++;
TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
"SequencedWorkerPool::PostTask",
TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))));
sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
if (optional_token_name)
sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
pending_tasks_.insert(sequenced);
if (shutdown_behavior == BLOCK_SHUTDOWN)
blocking_shutdown_pending_task_count_++;
create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
}
if (create_thread_id)
FinishStartingAdditionalThread(create_thread_id);
else
SignalHasWork();
return true;
}
bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
AutoLock lock(lock_);
return ContainsKey(threads_, PlatformThread::CurrentId());
}
bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
SequenceToken sequence_token) const {
AutoLock lock(lock_);
ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
if (found == threads_.end())
return false;
return sequence_token.Equals(found->second->running_sequence());
}
void SequencedWorkerPool::Inner::CleanupForTesting() {
DCHECK(!RunsTasksOnCurrentThread());
base::ThreadRestrictions::ScopedAllowWait allow_wait;
AutoLock lock(lock_);
CHECK_EQ(CLEANUP_DONE, cleanup_state_);
if (shutdown_called_)
return;
if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
return;
cleanup_state_ = CLEANUP_REQUESTED;
cleanup_idlers_ = 0;
has_work_cv_.Signal();
while (cleanup_state_ != CLEANUP_DONE)
cleanup_cv_.Wait();
}
void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
SignalHasWork();
}
void SequencedWorkerPool::Inner::Shutdown(
int max_new_blocking_tasks_after_shutdown) {
DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
{
AutoLock lock(lock_);
CHECK_EQ(CLEANUP_DONE, cleanup_state_);
if (shutdown_called_)
return;
shutdown_called_ = true;
max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
SignalHasWork();
if (CanShutdown())
return;
}
if (testing_observer_)
testing_observer_->WillWaitForShutdown();
#if !defined(OS_NACL)
TimeTicks shutdown_wait_begin = TimeTicks::Now();
#endif
{
base::ThreadRestrictions::ScopedAllowWait allow_wait;
AutoLock lock(lock_);
while (!CanShutdown())
can_shutdown_cv_.Wait();
}
#if !defined(OS_NACL)
UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
TimeTicks::Now() - shutdown_wait_begin);
#endif
}
bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
AutoLock lock(lock_);
return shutdown_called_;
}
void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
{
AutoLock lock(lock_);
DCHECK(thread_being_created_);
thread_being_created_ = false;
std::pair<ThreadMap::iterator, bool> result =
threads_.insert(
std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
DCHECK(result.second);
while (true) {
#if defined(OS_MACOSX)
base::mac::ScopedNSAutoreleasePool autorelease_pool;
#endif
HandleCleanup();
SequencedTask task;
TimeDelta wait_time;
std::vector<Closure> delete_these_outside_lock;
GetWorkStatus status =
GetWork(&task, &wait_time, &delete_these_outside_lock);
if (status == GET_WORK_FOUND) {
TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
"SequencedWorkerPool::PostTask",
TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))));
TRACE_EVENT2("toplevel", "SequencedWorkerPool::ThreadLoop",
"src_file", task.posted_from.file_name(),
"src_func", task.posted_from.function_name());
int new_thread_id = WillRunWorkerTask(task);
{
AutoUnlock unlock(lock_);
SignalHasWork();
delete_these_outside_lock.clear();
if (new_thread_id)
FinishStartingAdditionalThread(new_thread_id);
this_worker->set_running_task_info(
SequenceToken(task.sequence_token_id), task.shutdown_behavior);
tracked_objects::TrackedTime start_time =
tracked_objects::ThreadData::NowForStartOfRun(task.birth_tally);
task.task.Run();
tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(task,
start_time, tracked_objects::ThreadData::NowForEndOfRun());
task.task = Closure();
this_worker->set_running_task_info(
SequenceToken(), CONTINUE_ON_SHUTDOWN);
}
DidRunWorkerTask(task);
} else if (cleanup_state_ == CLEANUP_RUNNING) {
switch (status) {
case GET_WORK_WAIT: {
AutoUnlock unlock(lock_);
delete_these_outside_lock.clear();
}
break;
case GET_WORK_NOT_FOUND:
CHECK(delete_these_outside_lock.empty());
cleanup_state_ = CLEANUP_FINISHING;
cleanup_cv_.Broadcast();
break;
default:
NOTREACHED();
}
} else {
if (shutdown_called_ &&
blocking_shutdown_pending_task_count_ == 0)
break;
waiting_thread_count_++;
switch (status) {
case GET_WORK_NOT_FOUND:
has_work_cv_.Wait();
break;
case GET_WORK_WAIT:
has_work_cv_.TimedWait(wait_time);
break;
default:
NOTREACHED();
}
waiting_thread_count_--;
}
}
}
SignalHasWork();
can_shutdown_cv_.Signal();
}
void SequencedWorkerPool::Inner::HandleCleanup() {
lock_.AssertAcquired();
if (cleanup_state_ == CLEANUP_DONE)
return;
if (cleanup_state_ == CLEANUP_REQUESTED) {
cleanup_state_ = CLEANUP_STARTING;
while (thread_being_created_ ||
cleanup_idlers_ != threads_.size() - 1) {
has_work_cv_.Signal();
cleanup_cv_.Wait();
}
cleanup_state_ = CLEANUP_RUNNING;
return;
}
if (cleanup_state_ == CLEANUP_STARTING) {
++cleanup_idlers_;
cleanup_cv_.Broadcast();
while (cleanup_state_ != CLEANUP_FINISHING) {
cleanup_cv_.Wait();
}
--cleanup_idlers_;
cleanup_cv_.Broadcast();
return;
}
if (cleanup_state_ == CLEANUP_FINISHING) {
while (cleanup_idlers_ != 0) {
cleanup_cv_.Broadcast();
cleanup_cv_.Wait();
}
if (cleanup_state_ == CLEANUP_FINISHING) {
cleanup_state_ = CLEANUP_DONE;
cleanup_cv_.Signal();
}
return;
}
}
int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
const std::string& name) {
lock_.AssertAcquired();
DCHECK(!name.empty());
std::map<std::string, int>::const_iterator found =
named_sequence_tokens_.find(name);
if (found != named_sequence_tokens_.end())
return found->second;
SequenceToken result = GetSequenceToken();
named_sequence_tokens_.insert(std::make_pair(name, result.id_));
return result.id_;
}
int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
lock_.AssertAcquired();
return next_sequence_task_number_++;
}
SequencedWorkerPool::WorkerShutdown
SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const {
lock_.AssertAcquired();
ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
if (found == threads_.end())
return CONTINUE_ON_SHUTDOWN;
return found->second->running_shutdown_behavior();
}
SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
SequencedTask* task,
TimeDelta* wait_time,
std::vector<Closure>* delete_these_outside_lock) {
lock_.AssertAcquired();
#if !defined(OS_NACL)
UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount",
static_cast<int>(pending_tasks_.size()));
#endif
GetWorkStatus status = GET_WORK_NOT_FOUND;
int unrunnable_tasks = 0;
PendingTaskSet::iterator i = pending_tasks_.begin();
const TimeTicks current_time = TimeTicks::Now();
while (i != pending_tasks_.end()) {
if (!IsSequenceTokenRunnable(i->sequence_token_id)) {
unrunnable_tasks++;
++i;
continue;
}
if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
delete_these_outside_lock->push_back(i->task);
pending_tasks_.erase(i++);
continue;
}
if (i->time_to_run > current_time) {
*wait_time = i->time_to_run - current_time;
status = GET_WORK_WAIT;
if (cleanup_state_ == CLEANUP_RUNNING) {
delete_these_outside_lock->push_back(i->task);
pending_tasks_.erase(i);
}
break;
}
*task = *i;
pending_tasks_.erase(i);
if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
blocking_shutdown_pending_task_count_--;
}
status = GET_WORK_FOUND;
break;
}
#if !defined(OS_NACL)
UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount",
unrunnable_tasks);
#endif
return status;
}
int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
lock_.AssertAcquired();
if (task.sequence_token_id)
current_sequences_.insert(task.sequence_token_id);
if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN)
blocking_shutdown_thread_count_++;
return PrepareToStartAdditionalThreadIfHelpful();
}
void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
lock_.AssertAcquired();
if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
DCHECK_GT(blocking_shutdown_thread_count_, 0u);
blocking_shutdown_thread_count_--;
}
if (task.sequence_token_id)
current_sequences_.erase(task.sequence_token_id);
}
bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
int sequence_token_id) const {
lock_.AssertAcquired();
return !sequence_token_id ||
current_sequences_.find(sequence_token_id) ==
current_sequences_.end();
}
int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
lock_.AssertAcquired();
if (!shutdown_called_ &&
!thread_being_created_ &&
cleanup_state_ == CLEANUP_DONE &&
threads_.size() < max_threads_ &&
waiting_thread_count_ == 0) {
for (PendingTaskSet::const_iterator i = pending_tasks_.begin();
i != pending_tasks_.end(); ++i) {
if (IsSequenceTokenRunnable(i->sequence_token_id)) {
thread_being_created_ = true;
return static_cast<int>(threads_.size() + 1);
}
}
}
return 0;
}
void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
int thread_number) {
DCHECK(thread_number > 0);
new Worker(worker_pool_, thread_number, thread_name_prefix_);
}
void SequencedWorkerPool::Inner::SignalHasWork() {
has_work_cv_.Signal();
if (testing_observer_) {
testing_observer_->OnHasWork();
}
}
bool SequencedWorkerPool::Inner::CanShutdown() const {
lock_.AssertAcquired();
return !thread_being_created_ &&
blocking_shutdown_thread_count_ == 0 &&
blocking_shutdown_pending_task_count_ == 0;
}
base::StaticAtomicSequenceNumber
SequencedWorkerPool::Inner::g_last_sequence_number_;
SequencedWorkerPool::SequenceToken
SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
if (g_lazy_tls_ptr == NULL)
return SequenceToken();
SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get();
if (!token)
return SequenceToken();
return *token;
}
SequencedWorkerPool::SequencedWorkerPool(
size_t max_threads,
const std::string& thread_name_prefix)
: constructor_message_loop_(MessageLoopProxy::current()),
inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
}
SequencedWorkerPool::SequencedWorkerPool(
size_t max_threads,
const std::string& thread_name_prefix,
TestingObserver* observer)
: constructor_message_loop_(MessageLoopProxy::current()),
inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
}
SequencedWorkerPool::~SequencedWorkerPool() {}
void SequencedWorkerPool::OnDestruct() const {
DCHECK(constructor_message_loop_.get());
if (RunsTasksOnCurrentThread()) {
constructor_message_loop_->DeleteSoon(FROM_HERE, this);
} else {
delete this;
}
}
SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
return inner_->GetSequenceToken();
}
SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
const std::string& name) {
return inner_->GetNamedSequenceToken(name);
}
scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
SequenceToken token) {
return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
}
scoped_refptr<SequencedTaskRunner>
SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior(
SequenceToken token, WorkerShutdown shutdown_behavior) {
return new SequencedWorkerPoolSequencedTaskRunner(
this, token, shutdown_behavior);
}
scoped_refptr<TaskRunner>
SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior(
WorkerShutdown shutdown_behavior) {
return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior);
}
bool SequencedWorkerPool::PostWorkerTask(
const tracked_objects::Location& from_here,
const Closure& task) {
return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN,
from_here, task, TimeDelta());
}
bool SequencedWorkerPool::PostDelayedWorkerTask(
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay) {
WorkerShutdown shutdown_behavior =
delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
from_here, task, delay);
}
bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
const tracked_objects::Location& from_here,
const Closure& task,
WorkerShutdown shutdown_behavior) {
return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
from_here, task, TimeDelta());
}
bool SequencedWorkerPool::PostSequencedWorkerTask(
SequenceToken sequence_token,
const tracked_objects::Location& from_here,
const Closure& task) {
return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN,
from_here, task, TimeDelta());
}
bool SequencedWorkerPool::PostDelayedSequencedWorkerTask(
SequenceToken sequence_token,
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay) {
WorkerShutdown shutdown_behavior =
delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
from_here, task, delay);
}
bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
const std::string& token_name,
const tracked_objects::Location& from_here,
const Closure& task) {
DCHECK(!token_name.empty());
return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
from_here, task, TimeDelta());
}
bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
SequenceToken sequence_token,
const tracked_objects::Location& from_here,
const Closure& task,
WorkerShutdown shutdown_behavior) {
return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
from_here, task, TimeDelta());
}
bool SequencedWorkerPool::PostDelayedTask(
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay) {
return PostDelayedWorkerTask(from_here, task, delay);
}
bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
return inner_->RunsTasksOnCurrentThread();
}
bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
SequenceToken sequence_token) const {
return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
}
void SequencedWorkerPool::FlushForTesting() {
inner_->CleanupForTesting();
}
void SequencedWorkerPool::SignalHasWorkForTesting() {
inner_->SignalHasWorkForTesting();
}
void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
DCHECK(constructor_message_loop_->BelongsToCurrentThread());
inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
}
bool SequencedWorkerPool::IsShutdownInProgress() {
return inner_->IsShutdownInProgress();
}
}