#ifdef UNSAFE_BUFFERS_BUILD
#pragma allow_unsafe_buffers
#endif
#include "base/task/thread_pool/pooled_single_thread_task_runner_manager.h"
#include <memory>
#include <string>
#include <utility>
#include "base/check.h"
#include "base/debug/leak_annotations.h"
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/memory/ptr_util.h"
#include "base/memory/raw_ptr.h"
#include "base/message_loop/message_pump.h"
#include "base/ranges/algorithm.h"
#include "base/strings/stringprintf.h"
#include "base/synchronization/atomic_flag.h"
#include "base/task/default_delayed_task_handle_delegate.h"
#include "base/task/single_thread_task_runner.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool/delayed_task_manager.h"
#include "base/task/thread_pool/priority_queue.h"
#include "base/task/thread_pool/sequence.h"
#include "base/task/thread_pool/task.h"
#include "base/task/thread_pool/task_source.h"
#include "base/task/thread_pool/task_tracker.h"
#include "base/task/thread_pool/worker_thread.h"
#include "base/threading/platform_thread.h"
#include "base/time/time.h"
#include "build/build_config.h"
#if BUILDFLAG(IS_WIN)
#include <windows.h>
#include "base/win/scoped_com_initializer.h"
#endif
namespace base {
namespace internal {
namespace {
bool g_manager_is_alive = …;
bool g_use_utility_thread_group = …;
size_t GetEnvironmentIndexForTraits(const TaskTraits& traits) { … }
class AtomicThreadRefChecker { … };
class WorkerThreadDelegate : public WorkerThread::Delegate { … };
#if BUILDFLAG(IS_WIN)
class WorkerThreadCOMDelegate : public WorkerThreadDelegate {
public:
WorkerThreadCOMDelegate(const std::string& thread_name,
WorkerThread::ThreadLabel thread_label,
TrackedRef<TaskTracker> task_tracker)
: WorkerThreadDelegate(thread_name,
thread_label,
std::move(task_tracker)) {}
WorkerThreadCOMDelegate(const WorkerThreadCOMDelegate&) = delete;
WorkerThreadCOMDelegate& operator=(const WorkerThreadCOMDelegate&) = delete;
~WorkerThreadCOMDelegate() override { DCHECK(!scoped_com_initializer_); }
void OnMainEntry(WorkerThread* worker) override {
WorkerThreadDelegate::OnMainEntry(worker);
scoped_com_initializer_ = std::make_unique<win::ScopedCOMInitializer>();
CHECK(scoped_com_initializer_->Succeeded());
}
RegisteredTaskSource GetWork(WorkerThread* worker) override {
CheckedAutoLock auto_lock(lock_);
worker_awake_ = true;
RegisteredTaskSource task_source;
if (get_work_first_) {
task_source = WorkerThreadDelegate::GetWorkLockRequired(worker);
if (task_source)
get_work_first_ = false;
}
if (!task_source) {
CheckedAutoUnlock auto_unlock(lock_);
task_source = GetWorkFromWindowsMessageQueue();
if (task_source)
get_work_first_ = true;
}
if (!task_source && !get_work_first_) {
task_source = WorkerThreadDelegate::GetWorkLockRequired(worker);
}
if (!task_source) {
worker_awake_ = false;
return nullptr;
}
auto run_status = task_source.WillRunTask();
DCHECK_NE(run_status, TaskSource::RunStatus::kDisallowed);
return task_source;
}
void OnMainExit(WorkerThread* ) override {
scoped_com_initializer_.reset();
}
void WaitForWork() override {
const TimeDelta sleep_time = GetSleepTimeout();
const DWORD milliseconds_wait = checked_cast<DWORD>(
sleep_time.is_max() ? INFINITE : sleep_time.InMilliseconds());
const HANDLE wake_up_event_handle = wake_up_event_.handle();
MsgWaitForMultipleObjectsEx(1, &wake_up_event_handle, milliseconds_wait,
QS_ALLINPUT, 0);
}
private:
RegisteredTaskSource GetWorkFromWindowsMessageQueue() {
MSG msg;
if (PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) != FALSE) {
Task pump_message_task(FROM_HERE,
BindOnce(
[](MSG msg) {
TranslateMessage(&msg);
DispatchMessage(&msg);
},
std::move(msg)),
TimeTicks::Now(), TimeDelta());
if (task_tracker()->WillPostTask(
&pump_message_task, TaskShutdownBehavior::SKIP_ON_SHUTDOWN)) {
auto transaction = message_pump_sequence_->BeginTransaction();
const bool sequence_should_be_queued =
transaction.WillPushImmediateTask();
DCHECK(sequence_should_be_queued)
<< "GetWorkFromWindowsMessageQueue() does not expect "
"queueing of pump tasks.";
auto registered_task_source = task_tracker_->RegisterTaskSource(
std::move(message_pump_sequence_));
if (!registered_task_source)
return nullptr;
transaction.PushImmediateTask(std::move(pump_message_task));
return registered_task_source;
} else {
auto leak = std::make_unique<Task>(std::move(pump_message_task));
ANNOTATE_LEAKING_OBJECT_PTR(leak.get());
leak.release();
}
}
return nullptr;
}
bool get_work_first_ = true;
const scoped_refptr<Sequence> message_pump_sequence_ =
MakeRefCounted<Sequence>(TaskTraits{MayBlock()},
nullptr,
TaskSourceExecutionMode::kParallel);
std::unique_ptr<win::ScopedCOMInitializer> scoped_com_initializer_;
};
#endif
}
class PooledSingleThreadTaskRunnerManager::PooledSingleThreadTaskRunner
: public SingleThreadTaskRunner { … };
PooledSingleThreadTaskRunnerManager::PooledSingleThreadTaskRunnerManager(
TrackedRef<TaskTracker> task_tracker,
DelayedTaskManager* delayed_task_manager)
: … { … }
PooledSingleThreadTaskRunnerManager::~PooledSingleThreadTaskRunnerManager() { … }
void PooledSingleThreadTaskRunnerManager::Start(
scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,
WorkerThreadObserver* worker_thread_observer) { … }
void PooledSingleThreadTaskRunnerManager::DidUpdateCanRunPolicy() { … }
scoped_refptr<SingleThreadTaskRunner>
PooledSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunner(
const TaskTraits& traits,
SingleThreadTaskRunnerThreadMode thread_mode) { … }
#if BUILDFLAG(IS_WIN)
scoped_refptr<SingleThreadTaskRunner>
PooledSingleThreadTaskRunnerManager::CreateCOMSTATaskRunner(
const TaskTraits& traits,
SingleThreadTaskRunnerThreadMode thread_mode) {
return CreateTaskRunnerImpl<WorkerThreadCOMDelegate>(traits, thread_mode);
}
#endif
PooledSingleThreadTaskRunnerManager::ContinueOnShutdown
PooledSingleThreadTaskRunnerManager::TraitsToContinueOnShutdown(
const TaskTraits& traits) { … }
template <typename DelegateType>
scoped_refptr<PooledSingleThreadTaskRunnerManager::PooledSingleThreadTaskRunner>
PooledSingleThreadTaskRunnerManager::CreateTaskRunnerImpl(
const TaskTraits& traits,
SingleThreadTaskRunnerThreadMode thread_mode) { … }
void PooledSingleThreadTaskRunnerManager::JoinForTesting() { … }
template <>
std::unique_ptr<WorkerThreadDelegate>
PooledSingleThreadTaskRunnerManager::CreateWorkerThreadDelegate<
WorkerThreadDelegate>(const std::string& name,
int id,
SingleThreadTaskRunnerThreadMode thread_mode) { … }
#if BUILDFLAG(IS_WIN)
template <>
std::unique_ptr<WorkerThreadDelegate>
PooledSingleThreadTaskRunnerManager::CreateWorkerThreadDelegate<
WorkerThreadCOMDelegate>(const std::string& name,
int id,
SingleThreadTaskRunnerThreadMode thread_mode) {
return std::make_unique<WorkerThreadCOMDelegate>(
StringPrintf("ThreadPoolSingleThreadCOMSTA%s%d", name.c_str(), id),
thread_mode == SingleThreadTaskRunnerThreadMode::DEDICATED
? WorkerThread::ThreadLabel::DEDICATED_COM
: WorkerThread::ThreadLabel::SHARED_COM,
task_tracker_);
}
#endif
template <typename DelegateType>
WorkerThread*
PooledSingleThreadTaskRunnerManager::CreateAndRegisterWorkerThread(
const std::string& name,
SingleThreadTaskRunnerThreadMode thread_mode,
ThreadType thread_type_hint) { … }
template <>
WorkerThread*&
PooledSingleThreadTaskRunnerManager::GetSharedWorkerThreadForTraits<
WorkerThreadDelegate>(const TaskTraits& traits) { … }
#if BUILDFLAG(IS_WIN)
template <>
WorkerThread*&
PooledSingleThreadTaskRunnerManager::GetSharedWorkerThreadForTraits<
WorkerThreadCOMDelegate>(const TaskTraits& traits) {
return shared_com_worker_threads_[GetEnvironmentIndexForTraits(traits)]
[TraitsToContinueOnShutdown(traits)];
}
#endif
void PooledSingleThreadTaskRunnerManager::UnregisterWorkerThread(
WorkerThread* worker) { … }
void PooledSingleThreadTaskRunnerManager::ReleaseSharedWorkerThreads() { … }
}
}