chromium/third_party/eigen3/src/Eigen/src/Core/products/Parallelizer.h

// This file is part of Eigen, a lightweight C++ template library
// for linear algebra.
//
// Copyright (C) 2010 Gael Guennebaud <[email protected]>
//
// This Source Code Form is subject to the terms of the Mozilla
// Public License v. 2.0. If a copy of the MPL was not distributed
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.

#ifndef EIGEN_PARALLELIZER_H
#define EIGEN_PARALLELIZER_H

// IWYU pragma: private
#include "../InternalHeaderCheck.h"

// Note that in the following, there are 3 different uses of the concept
// "number of threads":
//  1. Max number of threads used by OpenMP or ThreadPool.
//     * For OpenMP this is typically the value set by the OMP_NUM_THREADS
//       environment variable, or by a call to omp_set_num_threads() prior to
//       calling Eigen.
//     * For ThreadPool, this is the number of threads in the ThreadPool.
//  2. Max number of threads currently allowed to be used by parallel Eigen
//     operations. This is set by setNbThreads(), and cannot exceed the value
//     in 1.
//  3. The actual number of threads used for a given parallel Eigen operation.
//     This is typically computed on the fly using a cost model and cannot exceed
//     the value in 2.
//     * For OpenMP, this is typically the number of threads specified in individual
//       "omp parallel" pragmas associated with an Eigen operation.
//     * For ThreadPool, it is the number of concurrent tasks scheduled in the
//       threadpool for a given Eigen operation. Notice that since the threadpool
//       uses task stealing, there is no way to limit the number of concurrently
//       executing tasks to below the number in 1. except by limiting the total
//       number of tasks in flight.

#if defined(EIGEN_HAS_OPENMP) && defined(EIGEN_GEMM_THREADPOOL)
#error "EIGEN_HAS_OPENMP and EIGEN_GEMM_THREADPOOL may not both be defined."
#endif

namespace Eigen {

namespace internal {
inline void manage_multi_threading(Action action, int* v);
}

// Public APIs.

/** Must be call first when calling Eigen from multiple threads */
EIGEN_DEPRECATED inline void initParallel() {}

/** \returns the max number of threads reserved for Eigen
 * \sa setNbThreads */
inline int nbThreads() {}

/** Sets the max number of threads reserved for Eigen
 * \sa nbThreads */
inline void setNbThreads(int v) {}

#ifdef EIGEN_GEMM_THREADPOOL
// Sets the ThreadPool used by Eigen parallel Gemm.
//
// NOTICE: This function has a known race condition with
// parallelize_gemm below, and should not be called while
// an instance of that function is running.
//
// TODO(rmlarsen): Make the device API available instead of
// storing a local static pointer variable to avoid this issue.
inline ThreadPool* setGemmThreadPool(ThreadPool* new_pool) {
  static ThreadPool* pool;
  if (new_pool != nullptr) {
    // This will wait for work in all threads in *pool to finish,
    // then destroy the old ThreadPool, and then replace it with new_pool.
    pool = new_pool;
    // Reset the number of threads to the number of threads on the new pool.
    setNbThreads(pool->NumThreads());
  }
  return pool;
}

// Gets the ThreadPool used by Eigen parallel Gemm.
inline ThreadPool* getGemmThreadPool() { return setGemmThreadPool(nullptr); }
#endif

namespace internal {

// Implementation.

#if defined(EIGEN_USE_BLAS) || (!defined(EIGEN_HAS_OPENMP) && !defined(EIGEN_GEMM_THREADPOOL))

inline void manage_multi_threading(Action action, int* v) {}
template <typename Index>
struct GemmParallelInfo {};
template <bool Condition, typename Functor, typename Index>
EIGEN_STRONG_INLINE void parallelize_gemm(const Functor& func, Index rows, Index cols, Index /*unused*/,
                                          bool /*unused*/) {}

#else

template <typename Index>
struct GemmParallelTaskInfo {
  GemmParallelTaskInfo() : sync(-1), users(0), lhs_start(0), lhs_length(0) {}
  std::atomic<Index> sync;
  std::atomic<int> users;
  Index lhs_start;
  Index lhs_length;
};

template <typename Index>
struct GemmParallelInfo {
  const int logical_thread_id;
  const int num_threads;
  GemmParallelTaskInfo<Index>* task_info;

  GemmParallelInfo(int logical_thread_id_, int num_threads_, GemmParallelTaskInfo<Index>* task_info_)
      : logical_thread_id(logical_thread_id_), num_threads(num_threads_), task_info(task_info_) {}
};

inline void manage_multi_threading(Action action, int* v) {
  static int m_maxThreads = -1;
  if (action == SetAction) {
    eigen_internal_assert(v != nullptr);
#if defined(EIGEN_HAS_OPENMP)
    // Calling action == SetAction and *v = 0 means
    // restoring m_maxThreads to the maximum number of threads specified
    // for OpenMP.
    eigen_internal_assert(*v >= 0);
    int omp_threads = omp_get_max_threads();
    m_maxThreads = (*v == 0 ? omp_threads : std::min(*v, omp_threads));
#elif defined(EIGEN_GEMM_THREADPOOL)
    // Calling action == SetAction and *v = 0 means
    // restoring m_maxThreads to the number of threads in the ThreadPool,
    // which defaults to 1 if no pool was provided.
    eigen_internal_assert(*v >= 0);
    ThreadPool* pool = getGemmThreadPool();
    int pool_threads = pool != nullptr ? pool->NumThreads() : 1;
    m_maxThreads = (*v == 0 ? pool_threads : numext::mini(pool_threads, *v));
#endif
  } else if (action == GetAction) {
    eigen_internal_assert(v != nullptr);
    *v = m_maxThreads;
  } else {
    eigen_internal_assert(false);
  }
}

template <bool Condition, typename Functor, typename Index>
EIGEN_STRONG_INLINE void parallelize_gemm(const Functor& func, Index rows, Index cols, Index depth, bool transpose) {
  // Dynamically check whether we should even try to execute in parallel.
  // The conditions are:
  // - the max number of threads we can create is greater than 1
  // - we are not already in a parallel code
  // - the sizes are large enough

  // compute the maximal number of threads from the size of the product:
  // This first heuristic takes into account that the product kernel is fully optimized when working with nr columns at
  // once.
  Index size = transpose ? rows : cols;
  Index pb_max_threads = std::max<Index>(1, size / Functor::Traits::nr);

  // compute the maximal number of threads from the total amount of work:
  double work = static_cast<double>(rows) * static_cast<double>(cols) * static_cast<double>(depth);
  double kMinTaskSize = 50000;  // FIXME improve this heuristic.
  pb_max_threads = std::max<Index>(1, std::min<Index>(pb_max_threads, static_cast<Index>(work / kMinTaskSize)));

  // compute the number of threads we are going to use
  int threads = std::min<int>(nbThreads(), static_cast<int>(pb_max_threads));

  // if multi-threading is explicitly disabled, not useful, or if we already are
  // inside a parallel session, then abort multi-threading
  bool dont_parallelize = (!Condition) || (threads <= 1);
#if defined(EIGEN_HAS_OPENMP)
  // don't parallelize if we are executing in a parallel context already.
  dont_parallelize |= omp_get_num_threads() > 1;
#elif defined(EIGEN_GEMM_THREADPOOL)
  // don't parallelize if we have a trivial threadpool or the current thread id
  // is != -1, indicating that we are already executing on a thread inside the pool.
  // In other words, we do not allow nested parallelism, since this would lead to
  // deadlocks due to the workstealing nature of the threadpool.
  ThreadPool* pool = getGemmThreadPool();
  dont_parallelize |= (pool == nullptr || pool->CurrentThreadId() != -1);
#endif
  if (dont_parallelize) return func(0, rows, 0, cols);

  func.initParallelSession(threads);

  if (transpose) std::swap(rows, cols);

  ei_declare_aligned_stack_constructed_variable(GemmParallelTaskInfo<Index>, task_info, threads, 0);

#if defined(EIGEN_HAS_OPENMP)
#pragma omp parallel num_threads(threads)
  {
    Index i = omp_get_thread_num();
    // Note that the actual number of threads might be lower than the number of
    // requested ones
    Index actual_threads = omp_get_num_threads();
    GemmParallelInfo<Index> info(i, static_cast<int>(actual_threads), task_info);

    Index blockCols = (cols / actual_threads) & ~Index(0x3);
    Index blockRows = (rows / actual_threads);
    blockRows = (blockRows / Functor::Traits::mr) * Functor::Traits::mr;

    Index r0 = i * blockRows;
    Index actualBlockRows = (i + 1 == actual_threads) ? rows - r0 : blockRows;

    Index c0 = i * blockCols;
    Index actualBlockCols = (i + 1 == actual_threads) ? cols - c0 : blockCols;

    info.task_info[i].lhs_start = r0;
    info.task_info[i].lhs_length = actualBlockRows;

    if (transpose)
      func(c0, actualBlockCols, 0, rows, &info);
    else
      func(0, rows, c0, actualBlockCols, &info);
  }

#elif defined(EIGEN_GEMM_THREADPOOL)
  ei_declare_aligned_stack_constructed_variable(GemmParallelTaskInfo<Index>, meta_info, threads, 0);
  Barrier barrier(threads);
  auto task = [=, &func, &barrier, &task_info](int i) {
    Index actual_threads = threads;
    GemmParallelInfo<Index> info(i, static_cast<int>(actual_threads), task_info);
    Index blockCols = (cols / actual_threads) & ~Index(0x3);
    Index blockRows = (rows / actual_threads);
    blockRows = (blockRows / Functor::Traits::mr) * Functor::Traits::mr;

    Index r0 = i * blockRows;
    Index actualBlockRows = (i + 1 == actual_threads) ? rows - r0 : blockRows;

    Index c0 = i * blockCols;
    Index actualBlockCols = (i + 1 == actual_threads) ? cols - c0 : blockCols;

    info.task_info[i].lhs_start = r0;
    info.task_info[i].lhs_length = actualBlockRows;

    if (transpose)
      func(c0, actualBlockCols, 0, rows, &info);
    else
      func(0, rows, c0, actualBlockCols, &info);

    barrier.Notify();
  };
  // Notice that we do not schedule more than "threads" tasks, which allows us to
  // limit number of running threads, even if the threadpool itself was constructed
  // with a larger number of threads.
  for (int i = 0; i < threads - 1; ++i) {
    pool->Schedule([=, task = std::move(task)] { task(i); });
  }
  task(threads - 1);
  barrier.Wait();
#endif
}

#endif

}  // end namespace internal
}  // end namespace Eigen

#endif  // EIGEN_PARALLELIZER_H