chromium/base/task/thread_pool/pooled_single_thread_task_runner_manager.cc

// Copyright 2017 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifdef UNSAFE_BUFFERS_BUILD
// TODO(crbug.com/40284755): Remove this and spanify to fix the errors.
#pragma allow_unsafe_buffers
#endif

#include "base/task/thread_pool/pooled_single_thread_task_runner_manager.h"

#include <memory>
#include <string>
#include <utility>

#include "base/check.h"
#include "base/debug/leak_annotations.h"
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/memory/ptr_util.h"
#include "base/memory/raw_ptr.h"
#include "base/message_loop/message_pump.h"
#include "base/ranges/algorithm.h"
#include "base/strings/stringprintf.h"
#include "base/synchronization/atomic_flag.h"
#include "base/task/default_delayed_task_handle_delegate.h"
#include "base/task/single_thread_task_runner.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool/delayed_task_manager.h"
#include "base/task/thread_pool/priority_queue.h"
#include "base/task/thread_pool/sequence.h"
#include "base/task/thread_pool/task.h"
#include "base/task/thread_pool/task_source.h"
#include "base/task/thread_pool/task_tracker.h"
#include "base/task/thread_pool/worker_thread.h"
#include "base/threading/platform_thread.h"
#include "base/time/time.h"
#include "build/build_config.h"

#if BUILDFLAG(IS_WIN)
#include <windows.h>

#include "base/win/scoped_com_initializer.h"
#endif  // BUILDFLAG(IS_WIN)

namespace base {
namespace internal {

namespace {

// Boolean indicating whether there's a PooledSingleThreadTaskRunnerManager
// instance alive in this process. This variable should only be set when the
// PooledSingleThreadTaskRunnerManager instance is brought up (on the main
// thread; before any tasks are posted) and decremented when the instance is
// brought down (i.e., only when unit tests tear down the task environment and
// never in production). This makes the variable const while worker threads are
// up and as such it doesn't need to be atomic. It is used to tell when a task
// is posted from the main thread after the task environment was brought down in
// unit tests so that PooledSingleThreadTaskRunnerManager bound TaskRunners
// can return false on PostTask, letting such callers know they should complete
// necessary work synchronously. Note: |!g_manager_is_alive| is generally
// equivalent to |!ThreadPoolInstance::Get()| but has the advantage of being
// valid in thread_pool unit tests that don't instantiate a full
// thread pool.
bool g_manager_is_alive =;

bool g_use_utility_thread_group =;

size_t GetEnvironmentIndexForTraits(const TaskTraits& traits) {}

// Allows for checking the PlatformThread::CurrentRef() against a set
// PlatformThreadRef atomically without using locks.
class AtomicThreadRefChecker {};

class WorkerThreadDelegate : public WorkerThread::Delegate {};

#if BUILDFLAG(IS_WIN)

class WorkerThreadCOMDelegate : public WorkerThreadDelegate {
 public:
  WorkerThreadCOMDelegate(const std::string& thread_name,
                          WorkerThread::ThreadLabel thread_label,
                          TrackedRef<TaskTracker> task_tracker)
      : WorkerThreadDelegate(thread_name,
                             thread_label,
                             std::move(task_tracker)) {}

  WorkerThreadCOMDelegate(const WorkerThreadCOMDelegate&) = delete;
  WorkerThreadCOMDelegate& operator=(const WorkerThreadCOMDelegate&) = delete;
  ~WorkerThreadCOMDelegate() override { DCHECK(!scoped_com_initializer_); }

  // WorkerThread::Delegate:
  void OnMainEntry(WorkerThread* worker) override {
    WorkerThreadDelegate::OnMainEntry(worker);

    scoped_com_initializer_ = std::make_unique<win::ScopedCOMInitializer>();

    // CHECK to make sure this COM thread is initialized correctly in an STA.
    CHECK(scoped_com_initializer_->Succeeded());
  }

  RegisteredTaskSource GetWork(WorkerThread* worker) override {
    // This scheme below allows us to cover the following scenarios:
    // * Only WorkerThreadDelegate::GetWork() has work:
    //   Always return the task source from GetWork().
    // * Only the Windows Message Queue has work:
    //   Always return the task source from GetWorkFromWindowsMessageQueue();
    // * Both WorkerThreadDelegate::GetWork() and the Windows Message Queue
    //   have work:
    //   Process task sources from each source round-robin style.
    CheckedAutoLock auto_lock(lock_);

    // |worker_awake_| is always set before a call to WakeUp(), but it is
    // not set when messages are added to the Windows Message Queue. Ensure that
    // it is set before getting work, to avoid unnecessary wake ups.
    //
    // Note: It wouldn't be sufficient to set |worker_awake_| in WaitForWork()
    // when MsgWaitForMultipleObjectsEx() indicates that it was woken up by a
    // Windows Message, because of the following scenario:
    //  T1: PostTask
    //      Queue task
    //      Set |worker_awake_| to true
    //  T2: Woken up by a Windows Message
    //      Set |worker_awake_| to true
    //      Run the task posted by T1
    //      Wait for work
    //  T1: WakeUp()
    //  T2: Woken up by Waitable Event
    //      Does not set |worker_awake_| (wake up not from Windows Message)
    //      GetWork
    //      !! Getting work while |worker_awake_| is false !!
    worker_awake_ = true;
    RegisteredTaskSource task_source;
    if (get_work_first_) {
      task_source = WorkerThreadDelegate::GetWorkLockRequired(worker);
      if (task_source)
        get_work_first_ = false;
    }

    if (!task_source) {
      CheckedAutoUnlock auto_unlock(lock_);
      task_source = GetWorkFromWindowsMessageQueue();
      if (task_source)
        get_work_first_ = true;
    }

    if (!task_source && !get_work_first_) {
      // This case is important if we checked the Windows Message Queue first
      // and found there was no work. We don't want to return null immediately
      // as that could cause the thread to go to sleep while work is waiting via
      // WorkerThreadDelegate::GetWork().
      task_source = WorkerThreadDelegate::GetWorkLockRequired(worker);
    }
    if (!task_source) {
      // The worker will sleep after this returns nullptr.
      worker_awake_ = false;
      return nullptr;
    }
    auto run_status = task_source.WillRunTask();
    DCHECK_NE(run_status, TaskSource::RunStatus::kDisallowed);
    return task_source;
  }

  void OnMainExit(WorkerThread* /* worker */) override {
    scoped_com_initializer_.reset();
  }

  void WaitForWork() override {
    const TimeDelta sleep_time = GetSleepTimeout();
    const DWORD milliseconds_wait = checked_cast<DWORD>(
        sleep_time.is_max() ? INFINITE : sleep_time.InMilliseconds());
    const HANDLE wake_up_event_handle = wake_up_event_.handle();
    MsgWaitForMultipleObjectsEx(1, &wake_up_event_handle, milliseconds_wait,
                                QS_ALLINPUT, 0);
  }

 private:
  RegisteredTaskSource GetWorkFromWindowsMessageQueue() {
    MSG msg;
    if (PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) != FALSE) {
      Task pump_message_task(FROM_HERE,
                             BindOnce(
                                 [](MSG msg) {
                                   TranslateMessage(&msg);
                                   DispatchMessage(&msg);
                                 },
                                 std::move(msg)),
                             TimeTicks::Now(), TimeDelta());
      if (task_tracker()->WillPostTask(
              &pump_message_task, TaskShutdownBehavior::SKIP_ON_SHUTDOWN)) {
        auto transaction = message_pump_sequence_->BeginTransaction();
        const bool sequence_should_be_queued =
            transaction.WillPushImmediateTask();
        DCHECK(sequence_should_be_queued)
            << "GetWorkFromWindowsMessageQueue() does not expect "
               "queueing of pump tasks.";
        auto registered_task_source = task_tracker_->RegisterTaskSource(
            std::move(message_pump_sequence_));
        if (!registered_task_source)
          return nullptr;
        transaction.PushImmediateTask(std::move(pump_message_task));
        return registered_task_source;
      } else {
        // `pump_message_task`'s destructor may run sequence-affine code, so it
        // must be leaked when `WillPostTask` returns false.
        auto leak = std::make_unique<Task>(std::move(pump_message_task));
        ANNOTATE_LEAKING_OBJECT_PTR(leak.get());
        leak.release();
      }
    }
    return nullptr;
  }

  bool get_work_first_ = true;
  const scoped_refptr<Sequence> message_pump_sequence_ =
      MakeRefCounted<Sequence>(TaskTraits{MayBlock()},
                               nullptr,
                               TaskSourceExecutionMode::kParallel);
  std::unique_ptr<win::ScopedCOMInitializer> scoped_com_initializer_;
};

#endif  // BUILDFLAG(IS_WIN)

}  // namespace

class PooledSingleThreadTaskRunnerManager::PooledSingleThreadTaskRunner
    : public SingleThreadTaskRunner {};

PooledSingleThreadTaskRunnerManager::PooledSingleThreadTaskRunnerManager(
    TrackedRef<TaskTracker> task_tracker,
    DelayedTaskManager* delayed_task_manager)
    :{}

PooledSingleThreadTaskRunnerManager::~PooledSingleThreadTaskRunnerManager() {}

void PooledSingleThreadTaskRunnerManager::Start(
    scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,
    WorkerThreadObserver* worker_thread_observer) {}

void PooledSingleThreadTaskRunnerManager::DidUpdateCanRunPolicy() {}

scoped_refptr<SingleThreadTaskRunner>
PooledSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunner(
    const TaskTraits& traits,
    SingleThreadTaskRunnerThreadMode thread_mode) {}

#if BUILDFLAG(IS_WIN)
scoped_refptr<SingleThreadTaskRunner>
PooledSingleThreadTaskRunnerManager::CreateCOMSTATaskRunner(
    const TaskTraits& traits,
    SingleThreadTaskRunnerThreadMode thread_mode) {
  return CreateTaskRunnerImpl<WorkerThreadCOMDelegate>(traits, thread_mode);
}
#endif  // BUILDFLAG(IS_WIN)

// static
PooledSingleThreadTaskRunnerManager::ContinueOnShutdown
PooledSingleThreadTaskRunnerManager::TraitsToContinueOnShutdown(
    const TaskTraits& traits) {}

template <typename DelegateType>
scoped_refptr<PooledSingleThreadTaskRunnerManager::PooledSingleThreadTaskRunner>
PooledSingleThreadTaskRunnerManager::CreateTaskRunnerImpl(
    const TaskTraits& traits,
    SingleThreadTaskRunnerThreadMode thread_mode) {}

void PooledSingleThreadTaskRunnerManager::JoinForTesting() {}

template <>
std::unique_ptr<WorkerThreadDelegate>
PooledSingleThreadTaskRunnerManager::CreateWorkerThreadDelegate<
    WorkerThreadDelegate>(const std::string& name,
                          int id,
                          SingleThreadTaskRunnerThreadMode thread_mode) {}

#if BUILDFLAG(IS_WIN)
template <>
std::unique_ptr<WorkerThreadDelegate>
PooledSingleThreadTaskRunnerManager::CreateWorkerThreadDelegate<
    WorkerThreadCOMDelegate>(const std::string& name,
                             int id,
                             SingleThreadTaskRunnerThreadMode thread_mode) {
  return std::make_unique<WorkerThreadCOMDelegate>(
      StringPrintf("ThreadPoolSingleThreadCOMSTA%s%d", name.c_str(), id),
      thread_mode == SingleThreadTaskRunnerThreadMode::DEDICATED
          ? WorkerThread::ThreadLabel::DEDICATED_COM
          : WorkerThread::ThreadLabel::SHARED_COM,
      task_tracker_);
}
#endif  // BUILDFLAG(IS_WIN)

template <typename DelegateType>
WorkerThread*
PooledSingleThreadTaskRunnerManager::CreateAndRegisterWorkerThread(
    const std::string& name,
    SingleThreadTaskRunnerThreadMode thread_mode,
    ThreadType thread_type_hint) {}

template <>
WorkerThread*&
PooledSingleThreadTaskRunnerManager::GetSharedWorkerThreadForTraits<
    WorkerThreadDelegate>(const TaskTraits& traits) {}

#if BUILDFLAG(IS_WIN)
template <>
WorkerThread*&
PooledSingleThreadTaskRunnerManager::GetSharedWorkerThreadForTraits<
    WorkerThreadCOMDelegate>(const TaskTraits& traits) {
  return shared_com_worker_threads_[GetEnvironmentIndexForTraits(traits)]
                                   [TraitsToContinueOnShutdown(traits)];
}
#endif  // BUILDFLAG(IS_WIN)

void PooledSingleThreadTaskRunnerManager::UnregisterWorkerThread(
    WorkerThread* worker) {}

void PooledSingleThreadTaskRunnerManager::ReleaseSharedWorkerThreads() {}

}  // namespace internal
}  // namespace base