This source file includes following definitions.
- user_work_function_
- Setup
- DecCounter
- PostExitAndJoinAll
- WorkLoop
- WorkerThreadEntry
- DispatchMany
- DispatchHere
- Dispatch
#include "sdk_util/thread_pool.h"
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include "sdk_util/auto_lock.h"
namespace sdk_util {
ThreadPool::ThreadPool(int num_threads)
: threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false),
user_data_(NULL), user_work_function_(NULL) {
if (num_threads_ > 0) {
int status;
status = sem_init(&work_sem_, 0, 0);
if (-1 == status) {
fprintf(stderr, "Failed to initialize semaphore!\n");
exit(-1);
}
status = sem_init(&done_sem_, 0, 0);
if (-1 == status) {
fprintf(stderr, "Failed to initialize semaphore!\n");
exit(-1);
}
threads_ = new pthread_t[num_threads_];
for (int i = 0; i < num_threads_; i++) {
status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this);
if (0 != status) {
fprintf(stderr, "Failed to create thread!\n");
exit(-1);
}
}
}
}
ThreadPool::~ThreadPool() {
if (num_threads_ > 0) {
PostExitAndJoinAll();
delete[] threads_;
sem_destroy(&done_sem_);
sem_destroy(&work_sem_);
}
}
void ThreadPool::Setup(int counter, WorkFunction work, void *data) {
counter_ = counter;
user_work_function_ = work;
user_data_ = data;
}
int ThreadPool::DecCounter() {
return AtomicAddFetch(&counter_, -1);
}
void ThreadPool::PostExitAndJoinAll() {
exiting_ = true;
for (int i = 0; i < num_threads_; ++i)
sem_post(&work_sem_);
void* retval;
for (int i = 0; i < num_threads_; ++i)
pthread_join(threads_[i], &retval);
}
void ThreadPool::WorkLoop() {
while (true) {
sem_wait(&work_sem_);
if (exiting_) break;
while (true) {
int task_index = DecCounter();
if (task_index < 0)
break;
user_work_function_(task_index, user_data_);
}
sem_post(&done_sem_);
}
}
void* ThreadPool::WorkerThreadEntry(void* thiz) {
static_cast<ThreadPool*>(thiz)->WorkLoop();
return NULL;
}
void ThreadPool::DispatchMany(int num_tasks, WorkFunction work, void* data) {
Setup(num_tasks, work, data);
for (int i = 0; i < num_threads_; i++)
sem_post(&work_sem_);
for (int i = 0; i < num_threads_; i++)
sem_wait(&done_sem_);
}
void ThreadPool::DispatchHere(int num_tasks, WorkFunction work, void* data) {
for (int i = 0; i < num_tasks; i++)
work(i, data);
}
void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void* data) {
if (num_threads_ > 0)
DispatchMany(num_tasks, work, data);
else
DispatchHere(num_tasks, work, data);
}
}