/* [<][>][^][v][top][bottom][index][help] */
DEFINITIONS
This source file includes following definitions.
- de265_thread_create
- de265_thread_join
- de265_thread_destroy
- de265_mutex_init
- de265_mutex_destroy
- de265_mutex_lock
- de265_mutex_unlock
- de265_cond_init
- de265_cond_destroy
- de265_cond_broadcast
- de265_cond_wait
- de265_cond_signal
- de265_thread_create
- de265_thread_join
- de265_thread_destroy
- de265_mutex_init
- de265_mutex_destroy
- de265_mutex_lock
- de265_mutex_unlock
- de265_cond_init
- de265_cond_destroy
- de265_cond_broadcast
- de265_cond_wait
- de265_cond_signal
- wait_for_progress
- set_progress
- get_progress
- printblks
- worker_thread
- start_thread_pool
- stop_thread_pool
- add_task
/*
* H.265 video codec.
* Copyright (c) 2013-2014 struktur AG, Dirk Farin <farin@struktur.de>
*
* This file is part of libde265.
*
* libde265 is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* libde265 is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libde265. If not, see <http://www.gnu.org/licenses/>.
*/
#include "threads.h"
#include <assert.h>
#include <string.h>
#if defined(_MSC_VER) || defined(__MINGW32__)
# include <malloc.h>
#else
# include <alloca.h>
#endif
#ifndef _WIN32
// #include <intrin.h>
#define THREAD_RESULT void*
#define THREAD_PARAM void*
int de265_thread_create(de265_thread* t, void *(*start_routine) (void *), void *arg) { return pthread_create(t,NULL,start_routine,arg); }
void de265_thread_join(de265_thread t) { pthread_join(t,NULL); }
void de265_thread_destroy(de265_thread* t) { }
void de265_mutex_init(de265_mutex* m) { pthread_mutex_init(m,NULL); }
void de265_mutex_destroy(de265_mutex* m) { pthread_mutex_destroy(m); }
void de265_mutex_lock(de265_mutex* m) { pthread_mutex_lock(m); }
void de265_mutex_unlock(de265_mutex* m) { pthread_mutex_unlock(m); }
void de265_cond_init(de265_cond* c) { pthread_cond_init(c,NULL); }
void de265_cond_destroy(de265_cond* c) { pthread_cond_destroy(c); }
void de265_cond_broadcast(de265_cond* c,de265_mutex* m) { pthread_cond_broadcast(c); }
void de265_cond_wait(de265_cond* c,de265_mutex* m) { pthread_cond_wait(c,m); }
void de265_cond_signal(de265_cond* c) { pthread_cond_signal(c); }
#else // _WIN32
#define THREAD_RESULT DWORD WINAPI
#define THREAD_PARAM LPVOID
int de265_thread_create(de265_thread* t, LPTHREAD_START_ROUTINE start_routine, void *arg) {
HANDLE handle = CreateThread(NULL, 0, start_routine, arg, 0, NULL);
if (handle == NULL) {
return -1;
}
*t = handle;
return 0;
}
void de265_thread_join(de265_thread t) { WaitForSingleObject(t, INFINITE); }
void de265_thread_destroy(de265_thread* t) { CloseHandle(*t); *t = NULL; }
void de265_mutex_init(de265_mutex* m) { *m = CreateMutex(NULL, FALSE, NULL); }
void de265_mutex_destroy(de265_mutex* m) { CloseHandle(*m); }
void de265_mutex_lock(de265_mutex* m) { WaitForSingleObject(*m, INFINITE); }
void de265_mutex_unlock(de265_mutex* m) { ReleaseMutex(*m); }
void de265_cond_init(de265_cond* c) { win32_cond_init(c); }
void de265_cond_destroy(de265_cond* c) { win32_cond_destroy(c); }
void de265_cond_broadcast(de265_cond* c,de265_mutex* m)
{
de265_mutex_lock(m);
win32_cond_broadcast(c);
de265_mutex_unlock(m);
}
void de265_cond_wait(de265_cond* c,de265_mutex* m) { win32_cond_wait(c,m); }
void de265_cond_signal(de265_cond* c) { win32_cond_signal(c); }
#endif // _WIN32
de265_progress_lock::de265_progress_lock()
{
mProgress = 0;
de265_mutex_init(&mutex);
de265_cond_init(&cond);
}
de265_progress_lock::~de265_progress_lock()
{
de265_mutex_destroy(&mutex);
de265_cond_destroy(&cond);
}
void de265_progress_lock::wait_for_progress(int progress)
{
if (mProgress >= progress) {
return;
}
de265_mutex_lock(&mutex);
while (mProgress < progress) {
de265_cond_wait(&cond, &mutex);
}
de265_mutex_unlock(&mutex);
}
void de265_progress_lock::set_progress(int progress)
{
de265_mutex_lock(&mutex);
if (progress>mProgress) {
mProgress = progress;
de265_cond_broadcast(&cond, &mutex);
}
de265_mutex_unlock(&mutex);
}
int de265_progress_lock::get_progress() const
{
return mProgress;
}
#include "libde265/decctx.h"
#if 0
const char* line="--------------------------------------------------";
void printblks(const thread_pool* pool)
{
int w = pool->tasks[0].data.task_ctb.ctx->current_sps->PicWidthInCtbsY;
int h = pool->tasks[0].data.task_ctb.ctx->current_sps->PicHeightInCtbsY;
printf("active threads: %d queue len: %d\n",pool->num_threads_working,pool->num_tasks);
char *const p = (char *)alloca(w * h * sizeof(char));
assert(p != NULL);
memset(p,' ',w*h);
for (int i=0;i<pool->num_tasks;i++) {
int b = 0; //pool->tasks[i].num_blockers;
int x = pool->tasks[i].data.task_ctb.ctb_x;
int y = pool->tasks[i].data.task_ctb.ctb_y;
p[y*w+x] = b+'0';
}
for (int i=0;i<pool->num_threads_working;i++) {
int x = pool->ctbx[i];
int y = pool->ctby[i];
p[y*w+x] = '*';
}
printf("+%s+\n",line+50-w);
for (int y=0;y<h;y++)
{
printf("|");
for (int x=0;x<w;x++)
{
printf("%c",p[x+y*w]);
}
printf("|\n");
}
printf("+%s+\n",line+50-w);
}
#endif
static THREAD_RESULT worker_thread(THREAD_PARAM pool_ptr)
{
thread_pool* pool = (thread_pool*)pool_ptr;
de265_mutex_lock(&pool->mutex);
while(true) {
// wait until we can pick a task or until the pool has been stopped
for (;;) {
// end waiting if thread-pool has been stopped or we have a task to execute
if (pool->stopped || pool->tasks.size()>0) {
break;
}
//printf("going idle\n");
de265_cond_wait(&pool->cond_var, &pool->mutex);
}
// if the pool was shut down, end the execution
if (pool->stopped) {
de265_mutex_unlock(&pool->mutex);
return NULL;
}
// get a task
thread_task* task = pool->tasks.front();
pool->tasks.pop_front();
pool->num_threads_working++;
//printblks(pool);
de265_mutex_unlock(&pool->mutex);
// execute the task
task->work();
// end processing and check if this was the last task to be processed
de265_mutex_lock(&pool->mutex);
pool->num_threads_working--;
}
de265_mutex_unlock(&pool->mutex);
return NULL;
}
de265_error start_thread_pool(thread_pool* pool, int num_threads)
{
de265_error err = DE265_OK;
// limit number of threads to maximum
if (num_threads > MAX_THREADS) {
num_threads = MAX_THREADS;
err = DE265_WARNING_NUMBER_OF_THREADS_LIMITED_TO_MAXIMUM;
}
pool->num_threads = 0; // will be increased below
de265_mutex_init(&pool->mutex);
de265_cond_init(&pool->cond_var);
de265_mutex_lock(&pool->mutex);
pool->num_threads_working = 0;
pool->stopped = false;
de265_mutex_unlock(&pool->mutex);
// start worker threads
for (int i=0; i<num_threads; i++) {
int ret = de265_thread_create(&pool->thread[i], worker_thread, pool);
if (ret != 0) {
// cerr << "pthread_create() failed: " << ret << endl;
return DE265_ERROR_CANNOT_START_THREADPOOL;
}
pool->num_threads++;
}
return err;
}
void stop_thread_pool(thread_pool* pool)
{
de265_mutex_lock(&pool->mutex);
pool->stopped = true;
de265_mutex_unlock(&pool->mutex);
de265_cond_broadcast(&pool->cond_var, &pool->mutex);
for (int i=0;i<pool->num_threads;i++) {
de265_thread_join(pool->thread[i]);
de265_thread_destroy(&pool->thread[i]);
}
de265_mutex_destroy(&pool->mutex);
de265_cond_destroy(&pool->cond_var);
}
void add_task(thread_pool* pool, thread_task* task)
{
de265_mutex_lock(&pool->mutex);
if (!pool->stopped) {
pool->tasks.push_back(task);
// wake up one thread
de265_cond_signal(&pool->cond_var);
}
de265_mutex_unlock(&pool->mutex);
}
extern inline int de265_sync_sub_and_fetch(de265_sync_int* cnt, int n);
extern inline int de265_sync_add_and_fetch(de265_sync_int* cnt, int n);