This source file includes following definitions.
- de265_thread_create
- de265_thread_join
- de265_thread_destroy
- de265_mutex_init
- de265_mutex_destroy
- de265_mutex_lock
- de265_mutex_unlock
- de265_cond_init
- de265_cond_destroy
- de265_cond_broadcast
- de265_cond_wait
- de265_cond_signal
- de265_thread_create
- de265_thread_join
- de265_thread_destroy
- de265_mutex_init
- de265_mutex_destroy
- de265_mutex_lock
- de265_mutex_unlock
- de265_cond_init
- de265_cond_destroy
- de265_cond_broadcast
- de265_cond_wait
- de265_cond_signal
- wait_for_progress
- set_progress
- increase_progress
- get_progress
- printblks
- worker_thread
- start_thread_pool
- stop_thread_pool
- add_task
#include "threads.h"
#include <assert.h>
#include <string.h>
#if defined(_MSC_VER) || defined(__MINGW32__)
# include <malloc.h>
#elif defined(HAVE_ALLOCA_H)
# include <alloca.h>
#endif
#ifndef _WIN32
#define THREAD_RESULT void*
#define THREAD_PARAM void*
#include <stdio.h>
int de265_thread_create(de265_thread* t, void *(*start_routine) (void *), void *arg) { return pthread_create(t,NULL,start_routine,arg); }
void de265_thread_join(de265_thread t) { pthread_join(t,NULL); }
void de265_thread_destroy(de265_thread* t) { }
void de265_mutex_init(de265_mutex* m) { pthread_mutex_init(m,NULL); }
void de265_mutex_destroy(de265_mutex* m) { pthread_mutex_destroy(m); }
void de265_mutex_lock(de265_mutex* m) { pthread_mutex_lock(m); }
void de265_mutex_unlock(de265_mutex* m) { pthread_mutex_unlock(m); }
void de265_cond_init(de265_cond* c) { pthread_cond_init(c,NULL); }
void de265_cond_destroy(de265_cond* c) { pthread_cond_destroy(c); }
void de265_cond_broadcast(de265_cond* c,de265_mutex* m) { pthread_cond_broadcast(c); }
void de265_cond_wait(de265_cond* c,de265_mutex* m) { pthread_cond_wait(c,m); }
void de265_cond_signal(de265_cond* c) { pthread_cond_signal(c); }
#else
#define THREAD_RESULT DWORD WINAPI
#define THREAD_PARAM LPVOID
int de265_thread_create(de265_thread* t, LPTHREAD_START_ROUTINE start_routine, void *arg) {
HANDLE handle = CreateThread(NULL, 0, start_routine, arg, 0, NULL);
if (handle == NULL) {
return -1;
}
*t = handle;
return 0;
}
void de265_thread_join(de265_thread t) { WaitForSingleObject(t, INFINITE); }
void de265_thread_destroy(de265_thread* t) { CloseHandle(*t); *t = NULL; }
void de265_mutex_init(de265_mutex* m) { *m = CreateMutex(NULL, FALSE, NULL); }
void de265_mutex_destroy(de265_mutex* m) { CloseHandle(*m); }
void de265_mutex_lock(de265_mutex* m) { WaitForSingleObject(*m, INFINITE); }
void de265_mutex_unlock(de265_mutex* m) { ReleaseMutex(*m); }
void de265_cond_init(de265_cond* c) { win32_cond_init(c); }
void de265_cond_destroy(de265_cond* c) { win32_cond_destroy(c); }
void de265_cond_broadcast(de265_cond* c,de265_mutex* m)
{
de265_mutex_lock(m);
win32_cond_broadcast(c);
de265_mutex_unlock(m);
}
void de265_cond_wait(de265_cond* c,de265_mutex* m) { win32_cond_wait(c,m); }
void de265_cond_signal(de265_cond* c) { win32_cond_signal(c); }
#endif
de265_progress_lock::de265_progress_lock()
{
mProgress = 0;
de265_mutex_init(&mutex);
de265_cond_init(&cond);
}
de265_progress_lock::~de265_progress_lock()
{
de265_mutex_destroy(&mutex);
de265_cond_destroy(&cond);
}
void de265_progress_lock::wait_for_progress(int progress)
{
if (mProgress >= progress) {
return;
}
de265_mutex_lock(&mutex);
while (mProgress < progress) {
de265_cond_wait(&cond, &mutex);
}
de265_mutex_unlock(&mutex);
}
void de265_progress_lock::set_progress(int progress)
{
de265_mutex_lock(&mutex);
if (progress>mProgress) {
mProgress = progress;
de265_cond_broadcast(&cond, &mutex);
}
de265_mutex_unlock(&mutex);
}
void de265_progress_lock::increase_progress(int progress)
{
de265_mutex_lock(&mutex);
mProgress += progress;
de265_cond_broadcast(&cond, &mutex);
de265_mutex_unlock(&mutex);
}
int de265_progress_lock::get_progress() const
{
return mProgress;
}
#include "libde265/decctx.h"
#if 0
const char* line="--------------------------------------------------";
void printblks(const thread_pool* pool)
{
int w = pool->tasks[0].data.task_ctb.ctx->current_sps->PicWidthInCtbsY;
int h = pool->tasks[0].data.task_ctb.ctx->current_sps->PicHeightInCtbsY;
printf("active threads: %d queue len: %d\n",pool->num_threads_working,pool->num_tasks);
char *const p = (char *)alloca(w * h * sizeof(char));
assert(p != NULL);
memset(p,' ',w*h);
for (int i=0;i<pool->num_tasks;i++) {
int b = 0;
int x = pool->tasks[i].data.task_ctb.ctb_x;
int y = pool->tasks[i].data.task_ctb.ctb_y;
p[y*w+x] = b+'0';
}
for (int i=0;i<pool->num_threads_working;i++) {
int x = pool->ctbx[i];
int y = pool->ctby[i];
p[y*w+x] = '*';
}
printf("+%s+\n",line+50-w);
for (int y=0;y<h;y++)
{
printf("|");
for (int x=0;x<w;x++)
{
printf("%c",p[x+y*w]);
}
printf("|\n");
}
printf("+%s+\n",line+50-w);
}
#endif
static THREAD_RESULT worker_thread(THREAD_PARAM pool_ptr)
{
thread_pool* pool = (thread_pool*)pool_ptr;
de265_mutex_lock(&pool->mutex);
while(true) {
for (;;) {
if (pool->stopped || pool->tasks.size()>0) {
break;
}
de265_cond_wait(&pool->cond_var, &pool->mutex);
}
if (pool->stopped) {
de265_mutex_unlock(&pool->mutex);
return NULL;
}
thread_task* task = pool->tasks.front();
pool->tasks.pop_front();
pool->num_threads_working++;
de265_mutex_unlock(&pool->mutex);
task->work();
de265_mutex_lock(&pool->mutex);
pool->num_threads_working--;
}
de265_mutex_unlock(&pool->mutex);
return NULL;
}
de265_error start_thread_pool(thread_pool* pool, int num_threads)
{
de265_error err = DE265_OK;
if (num_threads > MAX_THREADS) {
num_threads = MAX_THREADS;
err = DE265_WARNING_NUMBER_OF_THREADS_LIMITED_TO_MAXIMUM;
}
pool->num_threads = 0;
de265_mutex_init(&pool->mutex);
de265_cond_init(&pool->cond_var);
de265_mutex_lock(&pool->mutex);
pool->num_threads_working = 0;
pool->stopped = false;
de265_mutex_unlock(&pool->mutex);
for (int i=0; i<num_threads; i++) {
int ret = de265_thread_create(&pool->thread[i], worker_thread, pool);
if (ret != 0) {
return DE265_ERROR_CANNOT_START_THREADPOOL;
}
pool->num_threads++;
}
return err;
}
void stop_thread_pool(thread_pool* pool)
{
de265_mutex_lock(&pool->mutex);
pool->stopped = true;
de265_mutex_unlock(&pool->mutex);
de265_cond_broadcast(&pool->cond_var, &pool->mutex);
for (int i=0;i<pool->num_threads;i++) {
de265_thread_join(pool->thread[i]);
de265_thread_destroy(&pool->thread[i]);
}
de265_mutex_destroy(&pool->mutex);
de265_cond_destroy(&pool->cond_var);
}
void add_task(thread_pool* pool, thread_task* task)
{
de265_mutex_lock(&pool->mutex);
if (!pool->stopped) {
pool->tasks.push_back(task);
de265_cond_signal(&pool->cond_var);
}
de265_mutex_unlock(&pool->mutex);
}