#if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H)
#define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
#include "./InternalHeaderCheck.h"
namespace Eigen {
template <typename Function, typename... Args>
struct FunctionWrapperWithNotification {
static void run(Notification* n, Function f, Args... args) {
f(args...);
if (n) {
n->Notify();
}
}
};
template <typename Function, typename... Args>
struct FunctionWrapperWithBarrier {
static void run(Barrier* b, Function f, Args... args) {
f(args...);
if (b) {
b->Notify();
}
}
};
template <typename SyncType>
static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) {
if (n) {
n->Wait();
}
}
class Allocator {
public:
virtual ~Allocator() {}
virtual void* allocate(size_t num_bytes) const = 0;
virtual void deallocate(void* buffer) const = 0;
};
struct ThreadPoolDevice {
ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores, Allocator* allocator = nullptr)
: pool_(pool), num_threads_(num_cores), allocator_(allocator) {}
EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
return allocator_ ? allocator_->allocate(num_bytes) : internal::aligned_malloc(num_bytes);
}
EIGEN_STRONG_INLINE void deallocate(void* buffer) const {
if (allocator_) {
allocator_->deallocate(buffer);
} else {
internal::aligned_free(buffer);
}
}
EIGEN_STRONG_INLINE void* allocate_temp(size_t num_bytes) const { return allocate(num_bytes); }
EIGEN_STRONG_INLINE void deallocate_temp(void* buffer) const { deallocate(buffer); }
template <typename Type>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Type get(Type data) const {
return data;
}
EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const {
#ifdef __ANDROID__
::memcpy(dst, src, n);
#else
const size_t kMinBlockSize = 32768;
const size_t num_threads = CostModel::numThreads(n, TensorOpCost(1.0, 1.0, 0), 4);
if (n <= kMinBlockSize || num_threads < 2) {
::memcpy(dst, src, n);
} else {
const char* src_ptr = static_cast<const char*>(src);
char* dst_ptr = static_cast<char*>(dst);
const size_t blocksize = (n + (num_threads - 1)) / num_threads;
Barrier barrier(static_cast<int>(num_threads - 1));
for (size_t i = 1; i < num_threads; ++i) {
enqueue_with_barrier(&barrier, [n, i, src_ptr, dst_ptr, blocksize] {
::memcpy(dst_ptr + i * blocksize, src_ptr + i * blocksize, numext::mini(blocksize, n - (i * blocksize)));
});
}
::memcpy(dst_ptr, src_ptr, blocksize);
barrier.Wait();
}
#endif
}
EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const { memcpy(dst, src, n); }
EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const { memcpy(dst, src, n); }
EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const { ::memset(buffer, c, n); }
template <typename T>
EIGEN_STRONG_INLINE void fill(T* begin, T* end, const T& value) const {
std::fill(begin, end, value);
}
EIGEN_STRONG_INLINE int numThreads() const { return num_threads_; }
EIGEN_STRONG_INLINE int numThreadsInPool() const { return pool_->NumThreads(); }
EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const { return l1CacheSize(); }
EIGEN_STRONG_INLINE size_t lastLevelCacheSize() const {
return l3CacheSize() / num_threads_;
}
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void synchronize() const {
}
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const {
return 1;
}
template <class Function, class... Args>
EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const {
Notification* n = new Notification();
pool_->Schedule(
std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n, std::forward<Function>(f), args...));
return n;
}
template <class Function, class... Args>
EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b, Function&& f, Args&&... args) const {
pool_->Schedule(
std::bind(&FunctionWrapperWithBarrier<Function, Args...>::run, b, std::forward<Function>(f), args...));
}
template <class Function, class... Args>
EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const {
if (sizeof...(args) > 0) {
pool_->Schedule(std::bind(std::forward<Function>(f), args...));
} else {
pool_->Schedule(std::forward<Function>(f));
}
}
EIGEN_STRONG_INLINE int currentThreadId() const { return pool_->CurrentThreadId(); }
void parallelFor(Index n, const TensorOpCost& cost, std::function<Index(Index)> block_align,
std::function<void(Index, Index)> f) const {
if (EIGEN_PREDICT_FALSE(n <= 0)) {
return;
} else if (n == 1 || numThreads() == 1 || CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
f(0, n);
return;
}
ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
Barrier barrier(static_cast<unsigned int>(block.count));
std::function<void(Index, Index)> handleRange;
handleRange = [=, &handleRange, &barrier, &f](Index firstIdx, Index lastIdx) {
while (lastIdx - firstIdx > block.size) {
const Index midIdx = firstIdx + numext::div_ceil((lastIdx - firstIdx) / 2, block.size) * block.size;
pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
lastIdx = midIdx;
}
f(firstIdx, lastIdx);
barrier.Notify();
};
if (block.count <= numThreads()) {
handleRange(0, n);
} else {
pool_->Schedule([=, &handleRange]() { handleRange(0, n); });
}
barrier.Wait();
}
void parallelFor(Index n, const TensorOpCost& cost, std::function<void(Index, Index)> f) const {
parallelFor(n, cost, nullptr, std::move(f));
}
void parallelForAsync(Index n, const TensorOpCost& cost, std::function<Index(Index)> block_align,
std::function<void(Index, Index)> f, std::function<void()> done) const {
if (n <= 1 || numThreads() == 1 || CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
f(0, n);
done();
return;
}
ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
ParallelForAsyncContext* const ctx = new ParallelForAsyncContext(block.count, std::move(f), std::move(done));
ctx->handle_range = [this, ctx, block](Index firstIdx, Index lastIdx) {
while (lastIdx - firstIdx > block.size) {
const Index midIdx = firstIdx + numext::div_ceil((lastIdx - firstIdx) / 2, block.size) * block.size;
pool_->Schedule([ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
lastIdx = midIdx;
}
ctx->f(firstIdx, lastIdx);
if (ctx->count.fetch_sub(1) == 1) delete ctx;
};
if (block.count <= numThreads()) {
ctx->handle_range(0, n);
} else {
pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); });
}
}
void parallelForAsync(Index n, const TensorOpCost& cost, std::function<void(Index, Index)> f,
std::function<void()> done) const {
parallelForAsync(n, cost, nullptr, std::move(f), std::move(done));
}
ThreadPoolInterface* getPool() const { return pool_; }
Allocator* allocator() const { return allocator_; }
private:
typedef TensorCostModel<ThreadPoolDevice> CostModel;
struct ParallelForAsyncContext {
ParallelForAsyncContext(Index block_count, std::function<void(Index, Index)> block_f,
std::function<void()> done_callback)
: count(block_count), f(std::move(block_f)), done(std::move(done_callback)) {}
~ParallelForAsyncContext() { done(); }
std::atomic<Index> count;
std::function<void(Index, Index)> f;
std::function<void()> done;
std::function<void(Index, Index)> handle_range;
};
struct ParallelForBlock {
Index size;
Index count;
};
ParallelForBlock CalculateParallelForBlock(const Index n, const TensorOpCost& cost,
std::function<Index(Index)> block_align) const {
const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
const Index max_oversharding_factor = 4;
Index block_size = numext::mini(
n, numext::maxi<Index>(numext::div_ceil<Index>(n, max_oversharding_factor * numThreads()), block_size_f));
const Index max_block_size = numext::mini(n, 2 * block_size);
if (block_align) {
Index new_block_size = block_align(block_size);
eigen_assert(new_block_size >= block_size);
block_size = numext::mini(n, new_block_size);
}
Index block_count = numext::div_ceil(n, block_size);
double max_efficiency =
static_cast<double>(block_count) / (numext::div_ceil<Index>(block_count, numThreads()) * numThreads());
for (Index prev_block_count = block_count; max_efficiency < 1.0 && prev_block_count > 1;) {
Index coarser_block_size = numext::div_ceil(n, prev_block_count - 1);
if (block_align) {
Index new_block_size = block_align(coarser_block_size);
eigen_assert(new_block_size >= coarser_block_size);
coarser_block_size = numext::mini(n, new_block_size);
}
if (coarser_block_size > max_block_size) {
break;
}
const Index coarser_block_count = numext::div_ceil(n, coarser_block_size);
eigen_assert(coarser_block_count < prev_block_count);
prev_block_count = coarser_block_count;
const double coarser_efficiency = static_cast<double>(coarser_block_count) /
(numext::div_ceil<Index>(coarser_block_count, numThreads()) * numThreads());
if (coarser_efficiency + 0.01 >= max_efficiency) {
block_size = coarser_block_size;
block_count = coarser_block_count;
if (max_efficiency < coarser_efficiency) {
max_efficiency = coarser_efficiency;
}
}
}
return {block_size, block_count};
}
ThreadPoolInterface* pool_;
int num_threads_;
Allocator* allocator_;
};
}
#endif