#ifndef EIGEN_PARALLELIZER_H
#define EIGEN_PARALLELIZER_H
#include "../InternalHeaderCheck.h"
#if defined(EIGEN_HAS_OPENMP) && defined(EIGEN_GEMM_THREADPOOL)
#error "EIGEN_HAS_OPENMP and EIGEN_GEMM_THREADPOOL may not both be defined."
#endif
namespace Eigen {
namespace internal {
inline void manage_multi_threading(Action action, int* v);
}
EIGEN_DEPRECATED inline void initParallel() { … }
inline int nbThreads() { … }
inline void setNbThreads(int v) { … }
#ifdef EIGEN_GEMM_THREADPOOL
inline ThreadPool* setGemmThreadPool(ThreadPool* new_pool) {
static ThreadPool* pool;
if (new_pool != nullptr) {
pool = new_pool;
setNbThreads(pool->NumThreads());
}
return pool;
}
inline ThreadPool* getGemmThreadPool() { return setGemmThreadPool(nullptr); }
#endif
namespace internal {
#if defined(EIGEN_USE_BLAS) || (!defined(EIGEN_HAS_OPENMP) && !defined(EIGEN_GEMM_THREADPOOL))
inline void manage_multi_threading(Action action, int* v) { … }
template <typename Index>
struct GemmParallelInfo { … };
template <bool Condition, typename Functor, typename Index>
EIGEN_STRONG_INLINE void parallelize_gemm(const Functor& func, Index rows, Index cols, Index ,
bool ) { … }
#else
template <typename Index>
struct GemmParallelTaskInfo {
GemmParallelTaskInfo() : sync(-1), users(0), lhs_start(0), lhs_length(0) {}
std::atomic<Index> sync;
std::atomic<int> users;
Index lhs_start;
Index lhs_length;
};
template <typename Index>
struct GemmParallelInfo {
const int logical_thread_id;
const int num_threads;
GemmParallelTaskInfo<Index>* task_info;
GemmParallelInfo(int logical_thread_id_, int num_threads_, GemmParallelTaskInfo<Index>* task_info_)
: logical_thread_id(logical_thread_id_), num_threads(num_threads_), task_info(task_info_) {}
};
inline void manage_multi_threading(Action action, int* v) {
static int m_maxThreads = -1;
if (action == SetAction) {
eigen_internal_assert(v != nullptr);
#if defined(EIGEN_HAS_OPENMP)
eigen_internal_assert(*v >= 0);
int omp_threads = omp_get_max_threads();
m_maxThreads = (*v == 0 ? omp_threads : std::min(*v, omp_threads));
#elif defined(EIGEN_GEMM_THREADPOOL)
eigen_internal_assert(*v >= 0);
ThreadPool* pool = getGemmThreadPool();
int pool_threads = pool != nullptr ? pool->NumThreads() : 1;
m_maxThreads = (*v == 0 ? pool_threads : numext::mini(pool_threads, *v));
#endif
} else if (action == GetAction) {
eigen_internal_assert(v != nullptr);
*v = m_maxThreads;
} else {
eigen_internal_assert(false);
}
}
template <bool Condition, typename Functor, typename Index>
EIGEN_STRONG_INLINE void parallelize_gemm(const Functor& func, Index rows, Index cols, Index depth, bool transpose) {
Index size = transpose ? rows : cols;
Index pb_max_threads = std::max<Index>(1, size / Functor::Traits::nr);
double work = static_cast<double>(rows) * static_cast<double>(cols) * static_cast<double>(depth);
double kMinTaskSize = 50000;
pb_max_threads = std::max<Index>(1, std::min<Index>(pb_max_threads, static_cast<Index>(work / kMinTaskSize)));
int threads = std::min<int>(nbThreads(), static_cast<int>(pb_max_threads));
bool dont_parallelize = (!Condition) || (threads <= 1);
#if defined(EIGEN_HAS_OPENMP)
dont_parallelize |= omp_get_num_threads() > 1;
#elif defined(EIGEN_GEMM_THREADPOOL)
ThreadPool* pool = getGemmThreadPool();
dont_parallelize |= (pool == nullptr || pool->CurrentThreadId() != -1);
#endif
if (dont_parallelize) return func(0, rows, 0, cols);
func.initParallelSession(threads);
if (transpose) std::swap(rows, cols);
ei_declare_aligned_stack_constructed_variable(GemmParallelTaskInfo<Index>, task_info, threads, 0);
#if defined(EIGEN_HAS_OPENMP)
#pragma omp parallel num_threads(threads)
{
Index i = omp_get_thread_num();
Index actual_threads = omp_get_num_threads();
GemmParallelInfo<Index> info(i, static_cast<int>(actual_threads), task_info);
Index blockCols = (cols / actual_threads) & ~Index(0x3);
Index blockRows = (rows / actual_threads);
blockRows = (blockRows / Functor::Traits::mr) * Functor::Traits::mr;
Index r0 = i * blockRows;
Index actualBlockRows = (i + 1 == actual_threads) ? rows - r0 : blockRows;
Index c0 = i * blockCols;
Index actualBlockCols = (i + 1 == actual_threads) ? cols - c0 : blockCols;
info.task_info[i].lhs_start = r0;
info.task_info[i].lhs_length = actualBlockRows;
if (transpose)
func(c0, actualBlockCols, 0, rows, &info);
else
func(0, rows, c0, actualBlockCols, &info);
}
#elif defined(EIGEN_GEMM_THREADPOOL)
ei_declare_aligned_stack_constructed_variable(GemmParallelTaskInfo<Index>, meta_info, threads, 0);
Barrier barrier(threads);
auto task = [=, &func, &barrier, &task_info](int i) {
Index actual_threads = threads;
GemmParallelInfo<Index> info(i, static_cast<int>(actual_threads), task_info);
Index blockCols = (cols / actual_threads) & ~Index(0x3);
Index blockRows = (rows / actual_threads);
blockRows = (blockRows / Functor::Traits::mr) * Functor::Traits::mr;
Index r0 = i * blockRows;
Index actualBlockRows = (i + 1 == actual_threads) ? rows - r0 : blockRows;
Index c0 = i * blockCols;
Index actualBlockCols = (i + 1 == actual_threads) ? cols - c0 : blockCols;
info.task_info[i].lhs_start = r0;
info.task_info[i].lhs_length = actualBlockRows;
if (transpose)
func(c0, actualBlockCols, 0, rows, &info);
else
func(0, rows, c0, actualBlockCols, &info);
barrier.Notify();
};
for (int i = 0; i < threads - 1; ++i) {
pool->Schedule([=, task = std::move(task)] { task(i); });
}
task(threads - 1);
barrier.Wait();
#endif
}
#endif
}
}
#endif