This source file includes following definitions.
- halide_set_num_threads
- halide_shutdown_thread_pool
namespace Halide { namespace Runtime { namespace Internal {
struct work {
work *next_job;
int (*f)(void *, int, uint8_t *);
void *user_context;
int next, max;
uint8_t *closure;
int active_workers;
int exit_status;
bool running() { return next < max || active_workers > 0; }
};
#define MAX_THREADS 64
struct work_queue_t {
halide_mutex mutex;
work *jobs;
int a_team_size, target_a_team_size;
halide_cond wakeup_owners;
halide_cond wakeup_a_team;
halide_cond wakeup_b_team;
halide_thread *threads[MAX_THREADS];
int threads_created;
int desired_num_threads;
bool shutdown, initialized;
bool running() {
return !shutdown;
}
};
WEAK work_queue_t work_queue;
WEAK int default_do_task(void *user_context, halide_task_t f, int idx,
uint8_t *closure) {
return f(user_context, idx, closure);
}
WEAK int clamp_num_threads(int desired_num_threads) {
if (desired_num_threads > MAX_THREADS) {
desired_num_threads = MAX_THREADS;
} else if (desired_num_threads < 1) {
desired_num_threads = 1;
}
return desired_num_threads;
}
WEAK int default_desired_num_threads() {
int desired_num_threads = 0;
char *threads_str = getenv("HL_NUM_THREADS");
if (!threads_str) {
threads_str = getenv("HL_NUMTHREADS");
}
if (threads_str) {
desired_num_threads = atoi(threads_str);
} else {
desired_num_threads = halide_host_cpu_count();
}
return desired_num_threads;
}
WEAK void worker_thread_already_locked(work *owned_job) {
while (owned_job != NULL ? owned_job->running()
: work_queue.running()) {
if (work_queue.jobs == NULL) {
if (owned_job) {
halide_cond_wait(&work_queue.wakeup_owners, &work_queue.mutex);
} else if (work_queue.a_team_size <= work_queue.target_a_team_size) {
halide_cond_wait(&work_queue.wakeup_a_team, &work_queue.mutex);
} else {
work_queue.a_team_size--;
halide_cond_wait(&work_queue.wakeup_b_team, &work_queue.mutex);
work_queue.a_team_size++;
}
} else {
work *job = work_queue.jobs;
work myjob = *job;
job->next++;
if (job->next == job->max) {
work_queue.jobs = job->next_job;
}
job->active_workers++;
halide_mutex_unlock(&work_queue.mutex);
int result = halide_do_task(myjob.user_context, myjob.f, myjob.next,
myjob.closure);
halide_mutex_lock(&work_queue.mutex);
if (result) {
job->exit_status = result;
}
job->active_workers--;
if (!job->running() && job != owned_job) {
halide_cond_broadcast(&work_queue.wakeup_owners);
}
}
}
}
WEAK void worker_thread(void *) {
halide_mutex_lock(&work_queue.mutex);
worker_thread_already_locked(NULL);
halide_mutex_unlock(&work_queue.mutex);
}
WEAK int default_do_par_for(void *user_context, halide_task_t f,
int min, int size, uint8_t *closure) {
halide_mutex_lock(&work_queue.mutex);
if (!work_queue.initialized) {
work_queue.shutdown = false;
halide_cond_init(&work_queue.wakeup_owners);
halide_cond_init(&work_queue.wakeup_a_team);
halide_cond_init(&work_queue.wakeup_b_team);
work_queue.jobs = NULL;
if (!work_queue.desired_num_threads) {
work_queue.desired_num_threads = default_desired_num_threads();
}
work_queue.desired_num_threads = clamp_num_threads(work_queue.desired_num_threads);
work_queue.threads_created = 0;
work_queue.a_team_size = work_queue.desired_num_threads;
work_queue.initialized = true;
}
while (work_queue.threads_created < work_queue.desired_num_threads - 1) {
work_queue.threads[work_queue.threads_created++] =
halide_spawn_thread(worker_thread, NULL);
}
work job;
job.f = f;
job.user_context = user_context;
job.next = min;
job.max = min + size;
job.closure = closure;
job.exit_status = 0;
job.active_workers = 0;
if (!work_queue.jobs && size < work_queue.desired_num_threads) {
work_queue.target_a_team_size = size;
} else {
work_queue.target_a_team_size = work_queue.desired_num_threads;
}
job.next_job = work_queue.jobs;
work_queue.jobs = &job;
halide_cond_broadcast(&work_queue.wakeup_a_team);
if (work_queue.target_a_team_size > work_queue.a_team_size) {
halide_cond_broadcast(&work_queue.wakeup_b_team);
}
worker_thread_already_locked(&job);
halide_mutex_unlock(&work_queue.mutex);
return job.exit_status;
}
}}}
using namespace Halide::Runtime::Internal;
extern "C" {
WEAK int halide_set_num_threads(int n) {
if (n < 0) {
halide_error(NULL, "halide_set_num_threads: must be >= 0.");
}
halide_mutex_lock(&work_queue.mutex);
if (n == 0) {
n = default_desired_num_threads();
}
int old = work_queue.desired_num_threads;
work_queue.desired_num_threads = clamp_num_threads(n);
halide_mutex_unlock(&work_queue.mutex);
return old;
}
WEAK void halide_shutdown_thread_pool() {
if (!work_queue.initialized) return;
halide_mutex_lock(&work_queue.mutex);
work_queue.shutdown = true;
halide_cond_broadcast(&work_queue.wakeup_owners);
halide_cond_broadcast(&work_queue.wakeup_a_team);
halide_cond_broadcast(&work_queue.wakeup_b_team);
halide_mutex_unlock(&work_queue.mutex);
for (int i = 0; i < work_queue.threads_created; i++) {
halide_join_thread(work_queue.threads[i]);
}
halide_mutex_destroy(&work_queue.mutex);
halide_cond_destroy(&work_queue.wakeup_owners);
halide_cond_destroy(&work_queue.wakeup_a_team);
halide_cond_destroy(&work_queue.wakeup_b_team);
work_queue.initialized = false;
}
}