This source file includes following definitions.
- popCount
- m_id
- awaken
- threadMain
- tryWakeOne
- tryAcquireSleepingThread
- tryBondPeers
- allocThreadPools
- create
- start
- stopWorkers
- setCurrentThreadAffinity
- setThreadNodeAffinity
- getNumaNodeCount
- getCpuCount
- getFrameThreadsCount
#include "common.h"
#include "threadpool.h"
#include "threading.h"
#include <new>
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
#include <winnt.h>
#endif
#if X86_64
#ifdef __GNUC__
#define SLEEPBITMAP_CTZ(id, x) id = (unsigned long)__builtin_ctzll(x)
#define SLEEPBITMAP_OR(ptr, mask) __sync_fetch_and_or(ptr, mask)
#define SLEEPBITMAP_AND(ptr, mask) __sync_fetch_and_and(ptr, mask)
#elif defined(_MSC_VER)
#define SLEEPBITMAP_CTZ(id, x) _BitScanForward64(&id, x)
#define SLEEPBITMAP_OR(ptr, mask) InterlockedOr64((volatile LONG64*)ptr, (LONG)mask)
#define SLEEPBITMAP_AND(ptr, mask) InterlockedAnd64((volatile LONG64*)ptr, (LONG)mask)
#endif
#else
#define SLEEPBITMAP_CTZ CTZ
#define SLEEPBITMAP_OR ATOMIC_OR
#define SLEEPBITMAP_AND ATOMIC_AND
#endif
#if MACOS && __MACH__
#include <sys/param.h>
#include <sys/sysctl.h>
#endif
#if HAVE_LIBNUMA
#include <numa.h>
#endif
#if defined(_MSC_VER)
# define strcasecmp _stricmp
#endif
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
const uint64_t m1 = 0x5555555555555555;
const uint64_t m2 = 0x3333333333333333;
const uint64_t m3 = 0x0f0f0f0f0f0f0f0f;
const uint64_t h01 = 0x0101010101010101;
static int popCount(uint64_t x)
{
x -= (x >> 1) & m1;
x = (x & m2) + ((x >> 2) & m2);
x = (x + (x >> 4)) & m3;
return (x * h01) >> 56;
}
#endif
namespace X265_NS {
class WorkerThread : public Thread
{
private:
ThreadPool& m_pool;
int m_id;
Event m_wakeEvent;
WorkerThread& operator =(const WorkerThread&);
public:
JobProvider* m_curJobProvider;
BondedTaskGroup* m_bondMaster;
WorkerThread(ThreadPool& pool, int id) : m_pool(pool), m_id(id) {}
virtual ~WorkerThread() {}
void threadMain();
void awaken() { m_wakeEvent.trigger(); }
};
void WorkerThread::threadMain()
{
THREAD_NAME("Worker", m_id);
#if _WIN32
SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL);
#else
__attribute__((unused)) int val = nice(10);
#endif
m_pool.setCurrentThreadAffinity();
sleepbitmap_t idBit = (sleepbitmap_t)1 << m_id;
m_curJobProvider = m_pool.m_jpTable[0];
m_bondMaster = NULL;
SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
m_wakeEvent.wait();
while (m_pool.m_isActive)
{
if (m_bondMaster)
{
m_bondMaster->processTasks(m_id);
m_bondMaster->m_exitedPeerCount.incr();
m_bondMaster = NULL;
}
do
{
m_curJobProvider->findJob(m_id);
int curPriority = (m_curJobProvider->m_helpWanted) ? m_curJobProvider->m_sliceType :
INVALID_SLICE_PRIORITY + 1;
int nextProvider = -1;
for (int i = 0; i < m_pool.m_numProviders; i++)
{
if (m_pool.m_jpTable[i]->m_helpWanted &&
m_pool.m_jpTable[i]->m_sliceType < curPriority)
{
nextProvider = i;
curPriority = m_pool.m_jpTable[i]->m_sliceType;
}
}
if (nextProvider != -1 && m_curJobProvider != m_pool.m_jpTable[nextProvider])
{
SLEEPBITMAP_AND(&m_curJobProvider->m_ownerBitmap, ~idBit);
m_curJobProvider = m_pool.m_jpTable[nextProvider];
SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
}
}
while (m_curJobProvider->m_helpWanted);
SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
m_wakeEvent.wait();
}
SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
}
void JobProvider::tryWakeOne()
{
int id = m_pool->tryAcquireSleepingThread(m_ownerBitmap, ALL_POOL_THREADS);
if (id < 0)
{
m_helpWanted = true;
return;
}
WorkerThread& worker = m_pool->m_workers[id];
if (worker.m_curJobProvider != this)
{
sleepbitmap_t bit = (sleepbitmap_t)1 << id;
SLEEPBITMAP_AND(&worker.m_curJobProvider->m_ownerBitmap, ~bit);
worker.m_curJobProvider = this;
SLEEPBITMAP_OR(&worker.m_curJobProvider->m_ownerBitmap, bit);
}
worker.awaken();
}
int ThreadPool::tryAcquireSleepingThread(sleepbitmap_t firstTryBitmap, sleepbitmap_t secondTryBitmap)
{
unsigned long id;
sleepbitmap_t masked = m_sleepBitmap & firstTryBitmap;
while (masked)
{
SLEEPBITMAP_CTZ(id, masked);
sleepbitmap_t bit = (sleepbitmap_t)1 << id;
if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit)
return (int)id;
masked = m_sleepBitmap & firstTryBitmap;
}
masked = m_sleepBitmap & secondTryBitmap;
while (masked)
{
SLEEPBITMAP_CTZ(id, masked);
sleepbitmap_t bit = (sleepbitmap_t)1 << id;
if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit)
return (int)id;
masked = m_sleepBitmap & secondTryBitmap;
}
return -1;
}
int ThreadPool::tryBondPeers(int maxPeers, sleepbitmap_t peerBitmap, BondedTaskGroup& master)
{
int bondCount = 0;
do
{
int id = tryAcquireSleepingThread(peerBitmap, 0);
if (id < 0)
return bondCount;
m_workers[id].m_bondMaster = &master;
m_workers[id].awaken();
bondCount++;
}
while (bondCount < maxPeers);
return bondCount;
}
ThreadPool* ThreadPool::allocThreadPools(x265_param* p, int& numPools, bool isThreadsReserved)
{
enum { MAX_NODE_NUM = 127 };
int cpusPerNode[MAX_NODE_NUM + 1];
int threadsPerPool[MAX_NODE_NUM + 2];
uint64_t nodeMaskPerPool[MAX_NODE_NUM + 2];
int totalNumThreads = 0;
memset(cpusPerNode, 0, sizeof(cpusPerNode));
memset(threadsPerPool, 0, sizeof(threadsPerPool));
memset(nodeMaskPerPool, 0, sizeof(nodeMaskPerPool));
int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM);
bool bNumaSupport = false;
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
bNumaSupport = true;
#elif HAVE_LIBNUMA
bNumaSupport = numa_available() >= 0;
#endif
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
PGROUP_AFFINITY groupAffinityPointer = new GROUP_AFFINITY;
for (int i = 0; i < numNumaNodes; i++)
{
GetNumaNodeProcessorMaskEx((UCHAR)i, groupAffinityPointer);
cpusPerNode[i] = popCount(groupAffinityPointer->Mask);
}
delete groupAffinityPointer;
#elif HAVE_LIBNUMA
if (bNumaSupport)
{
struct bitmask* bitMask = numa_allocate_cpumask();
for (int i = 0; i < numNumaNodes; i++)
{
int ret = numa_node_to_cpus(i, bitMask);
if (!ret)
cpusPerNode[i] = numa_bitmask_weight(bitMask);
else
x265_log(p, X265_LOG_ERROR, "Failed to genrate CPU mask\n");
}
numa_free_cpumask(bitMask);
}
#else
cpusPerNode[0] = getCpuCount();
#endif
if (bNumaSupport && p->logLevel >= X265_LOG_DEBUG)
for (int i = 0; i < numNumaNodes; i++)
x265_log(p, X265_LOG_DEBUG, "detected NUMA node %d with %d logical cores\n", i, cpusPerNode[i]);
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
if (!p->numaPools || (strcmp(p->numaPools, "NULL") == 0 || strcmp(p->numaPools, "*") == 0 || strcmp(p->numaPools, "") == 0))
{
char poolString[50] = "";
for (int i = 0; i < numNumaNodes; i++)
{
char nextCount[10] = "";
if (i)
sprintf(nextCount, ",%d", cpusPerNode[i]);
else
sprintf(nextCount, "%d", cpusPerNode[i]);
strcat(poolString, nextCount);
}
x265_param_parse(p, "pools", poolString);
}
#endif
if (p->numaPools && *p->numaPools)
{
const char *nodeStr = p->numaPools;
for (int i = 0; i < numNumaNodes; i++)
{
if (!*nodeStr)
{
threadsPerPool[i] = 0;
continue;
}
else if (*nodeStr == '-')
threadsPerPool[i] = 0;
else if (*nodeStr == '*' || !strcasecmp(nodeStr, "NULL"))
{
for (int j = i; j < numNumaNodes; j++)
{
threadsPerPool[numNumaNodes] += cpusPerNode[j];
nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << j);
}
break;
}
else if (*nodeStr == '+')
{
threadsPerPool[numNumaNodes] += cpusPerNode[i];
nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << i);
}
else
{
int count = atoi(nodeStr);
if (i > 0 || strchr(nodeStr, ','))
{
threadsPerPool[i] = X265_MIN(count, cpusPerNode[i]);
nodeMaskPerPool[i] = ((uint64_t)1 << i);
}
else
{
threadsPerPool[numNumaNodes] = X265_MIN(count, numNumaNodes * MAX_POOL_THREADS);
nodeMaskPerPool[numNumaNodes] = ((uint64_t)-1 >> (64 - numNumaNodes));
}
}
while (*nodeStr && *nodeStr != ',')
++nodeStr;
if (*nodeStr == ',' || *nodeStr == ' ')
++nodeStr;
}
}
else
{
for (int i = 0; i < numNumaNodes; i++)
{
threadsPerPool[numNumaNodes] += cpusPerNode[i];
nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << i);
}
}
if ((threadsPerPool[numNumaNodes] > MAX_POOL_THREADS) &&
((threadsPerPool[numNumaNodes] % MAX_POOL_THREADS) < (MAX_POOL_THREADS / 2)))
{
threadsPerPool[numNumaNodes] -= (threadsPerPool[numNumaNodes] % MAX_POOL_THREADS);
x265_log(p, X265_LOG_DEBUG,
"Creating only %d worker threads beyond specified numbers with --pools (if specified) to prevent asymmetry in pools; may not use all HW contexts\n", threadsPerPool[numNumaNodes]);
}
numPools = 0;
for (int i = 0; i < numNumaNodes + 1; i++)
{
if (bNumaSupport)
x265_log(p, X265_LOG_DEBUG, "NUMA node %d may use %d logical cores\n", i, cpusPerNode[i]);
if (threadsPerPool[i])
{
numPools += (threadsPerPool[i] + MAX_POOL_THREADS - 1) / MAX_POOL_THREADS;
totalNumThreads += threadsPerPool[i];
}
}
if (!isThreadsReserved)
{
if (!numPools)
{
x265_log(p, X265_LOG_DEBUG, "No pool thread available. Deciding frame-threads based on detected CPU threads\n");
totalNumThreads = ThreadPool::getCpuCount();
}
if (!p->frameNumThreads)
ThreadPool::getFrameThreadsCount(p, totalNumThreads);
}
if (!numPools)
return NULL;
if (numPools > p->frameNumThreads)
{
x265_log(p, X265_LOG_DEBUG, "Reducing number of thread pools for frame thread count\n");
numPools = X265_MAX(p->frameNumThreads / 2, 1);
}
if (isThreadsReserved)
numPools = 1;
ThreadPool *pools = new ThreadPool[numPools];
if (pools)
{
int maxProviders = (p->frameNumThreads + numPools - 1) / numPools + !isThreadsReserved;
int node = 0;
for (int i = 0; i < numPools; i++)
{
while (!threadsPerPool[node])
node++;
int numThreads = X265_MIN(MAX_POOL_THREADS, threadsPerPool[node]);
int origNumThreads = numThreads;
if (i == 0 && p->lookaheadThreads > numThreads / 2)
{
p->lookaheadThreads = numThreads / 2;
x265_log(p, X265_LOG_DEBUG, "Setting lookahead threads to a maximum of half the total number of threads\n");
}
if (isThreadsReserved)
{
numThreads = p->lookaheadThreads;
maxProviders = 1;
}
else if (i == 0)
numThreads -= p->lookaheadThreads;
if (!pools[i].create(numThreads, maxProviders, nodeMaskPerPool[node]))
{
X265_FREE(pools);
numPools = 0;
return NULL;
}
if (numNumaNodes > 1)
{
char *nodesstr = new char[64 * strlen(",63") + 1];
int len = 0;
for (int j = 0; j < 64; j++)
if ((nodeMaskPerPool[node] >> j) & 1)
len += sprintf(nodesstr + len, ",%d", j);
x265_log(p, X265_LOG_INFO, "Thread pool %d using %d threads on numa nodes %s\n", i, numThreads, nodesstr + 1);
delete[] nodesstr;
}
else
x265_log(p, X265_LOG_INFO, "Thread pool created using %d threads\n", numThreads);
threadsPerPool[node] -= origNumThreads;
}
}
else
numPools = 0;
return pools;
}
ThreadPool::ThreadPool()
{
memset(this, 0, sizeof(*this));
}
bool ThreadPool::create(int numThreads, int maxProviders, uint64_t nodeMask)
{
X265_CHECK(numThreads <= MAX_POOL_THREADS, "a single thread pool cannot have more than MAX_POOL_THREADS threads\n");
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
memset(&m_groupAffinity, 0, sizeof(GROUP_AFFINITY));
for (int i = 0; i < getNumaNodeCount(); i++)
{
int numaNode = ((nodeMask >> i) & 0x1U) ? i : -1;
if (numaNode != -1)
if (GetNumaNodeProcessorMaskEx((USHORT)numaNode, &m_groupAffinity))
break;
}
m_numaMask = &m_groupAffinity.Mask;
#elif HAVE_LIBNUMA
if (numa_available() >= 0)
{
struct bitmask* nodemask = numa_allocate_nodemask();
if (nodemask)
{
*(nodemask->maskp) = nodeMask;
m_numaMask = nodemask;
}
else
x265_log(NULL, X265_LOG_ERROR, "unable to get NUMA node mask for %lx\n", nodeMask);
}
#else
(void)nodeMask;
#endif
m_numWorkers = numThreads;
m_workers = X265_MALLOC(WorkerThread, numThreads);
if (m_workers)
for (int i = 0; i < numThreads; i++)
new (m_workers + i)WorkerThread(*this, i);
m_jpTable = X265_MALLOC(JobProvider*, maxProviders);
m_numProviders = 0;
return m_workers && m_jpTable;
}
bool ThreadPool::start()
{
m_isActive = true;
for (int i = 0; i < m_numWorkers; i++)
{
if (!m_workers[i].start())
{
m_isActive = false;
return false;
}
}
return true;
}
void ThreadPool::stopWorkers()
{
if (m_workers)
{
m_isActive = false;
for (int i = 0; i < m_numWorkers; i++)
{
while (!(m_sleepBitmap & ((sleepbitmap_t)1 << i)))
GIVE_UP_TIME();
m_workers[i].awaken();
m_workers[i].stop();
}
}
}
ThreadPool::~ThreadPool()
{
if (m_workers)
{
for (int i = 0; i < m_numWorkers; i++)
m_workers[i].~WorkerThread();
}
X265_FREE(m_workers);
X265_FREE(m_jpTable);
#if HAVE_LIBNUMA
if(m_numaMask)
numa_free_nodemask((struct bitmask*)m_numaMask);
#endif
}
void ThreadPool::setCurrentThreadAffinity()
{
setThreadNodeAffinity(m_numaMask);
}
void ThreadPool::setThreadNodeAffinity(void *numaMask)
{
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
UNREFERENCED_PARAMETER(numaMask);
GROUP_AFFINITY groupAffinity;
memset(&groupAffinity, 0, sizeof(GROUP_AFFINITY));
groupAffinity.Group = m_groupAffinity.Group;
groupAffinity.Mask = m_groupAffinity.Mask;
const PGROUP_AFFINITY affinityPointer = &groupAffinity;
if (SetThreadGroupAffinity(GetCurrentThread(), affinityPointer, NULL))
return;
else
x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n");
#elif HAVE_LIBNUMA
if (numa_available() >= 0)
{
numa_run_on_node_mask((struct bitmask*)numaMask);
numa_set_interleave_mask((struct bitmask*)numaMask);
numa_set_localalloc();
return;
}
x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n");
#else
(void)numaMask;
#endif
return;
}
int ThreadPool::getNumaNodeCount()
{
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
ULONG num = 1;
if (GetNumaHighestNodeNumber(&num))
num++;
return (int)num;
#elif HAVE_LIBNUMA
if (numa_available() >= 0)
return numa_max_node() + 1;
else
return 1;
#else
return 1;
#endif
}
int ThreadPool::getCpuCount()
{
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
enum { MAX_NODE_NUM = 127 };
int cpus = 0;
int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM);
GROUP_AFFINITY groupAffinity;
for (int i = 0; i < numNumaNodes; i++)
{
GetNumaNodeProcessorMaskEx((UCHAR)i, &groupAffinity);
cpus += popCount(groupAffinity.Mask);
}
return cpus;
#elif _WIN32
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
return sysinfo.dwNumberOfProcessors;
#elif __unix__ && X265_ARCH_ARM
return sysconf(_SC_NPROCESSORS_CONF);
#elif __unix__
return sysconf(_SC_NPROCESSORS_ONLN);
#elif MACOS && __MACH__
int nm[2];
size_t len = 4;
uint32_t count;
nm[0] = CTL_HW;
nm[1] = HW_AVAILCPU;
sysctl(nm, 2, &count, &len, NULL, 0);
if (count < 1)
{
nm[1] = HW_NCPU;
sysctl(nm, 2, &count, &len, NULL, 0);
if (count < 1)
count = 1;
}
return count;
#else
return 2;
#endif
}
void ThreadPool::getFrameThreadsCount(x265_param* p, int cpuCount)
{
int rows = (p->sourceHeight + p->maxCUSize - 1) >> g_log2Size[p->maxCUSize];
if (!p->bEnableWavefront)
p->frameNumThreads = X265_MIN3(cpuCount, (rows + 1) / 2, X265_MAX_FRAME_THREADS);
else if (cpuCount >= 32)
p->frameNumThreads = (p->sourceHeight > 2000) ? 6 : 5;
else if (cpuCount >= 16)
p->frameNumThreads = 4;
else if (cpuCount >= 8)
p->frameNumThreads = 3;
else if (cpuCount >= 4)
p->frameNumThreads = 2;
else
p->frameNumThreads = 1;
}
}