// Copyright 2020 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chromecast/browser/service_manager_connection.h"
#include <map>
#include <queue>
#include <utility>
#include <vector>
#include "base/functional/bind.h"
#include "base/no_destructor.h"
#include "base/task/current_thread.h"
#include "base/task/sequenced_task_runner.h"
#include "base/thread_annotations.h"
#include "base/threading/thread_checker.h"
#include "mojo/public/cpp/bindings/pending_receiver.h"
#include "mojo/public/cpp/bindings/pending_remote.h"
#include "mojo/public/cpp/system/message_pipe.h"
#include "services/service_manager/public/cpp/service.h"
#include "services/service_manager/public/cpp/service_receiver.h"
#include "services/service_manager/public/mojom/constants.mojom.h"
#include "services/service_manager/public/mojom/interface_provider.mojom.h"
namespace chromecast {
namespace {
std::unique_ptr<ServiceManagerConnection>& GetConnectionForProcess() {
static base::NoDestructor<std::unique_ptr<ServiceManagerConnection>>
connection;
return *connection;
}
} // namespace
// A ref-counted object which owns the IO thread state of a
// ServiceManagerConnection. This includes Service and ServiceFactory
// bindings.
class ServiceManagerConnection::IOThreadContext
: public base::RefCountedThreadSafe<IOThreadContext>,
public service_manager::Service {
public:
IOThreadContext(
mojo::PendingReceiver<service_manager::mojom::Service> service_receiver,
scoped_refptr<base::SequencedTaskRunner> io_task_runner,
mojo::PendingReceiver<service_manager::mojom::Connector>
connector_receiver)
: pending_service_receiver_(std::move(service_receiver)),
io_task_runner_(io_task_runner),
pending_connector_receiver_(std::move(connector_receiver)) {
// This will be reattached by any of the IO thread functions on first call.
io_thread_checker_.DetachFromThread();
}
IOThreadContext(const IOThreadContext&) = delete;
IOThreadContext& operator=(const IOThreadContext&) = delete;
// Safe to call from any thread.
void Start() {
DCHECK(!started_);
started_ = true;
io_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&IOThreadContext::StartOnIOThread, this));
}
// Safe to call from whichever thread called Start() (or may have called
// Start()). Must be called before IO thread shutdown.
void ShutDown() {
if (!started_)
return;
bool posted = io_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&IOThreadContext::ShutDownOnIOThread, this));
DCHECK(posted);
}
private:
friend class base::RefCountedThreadSafe<IOThreadContext>;
class MessageLoopObserver : public base::CurrentThread::DestructionObserver {
public:
explicit MessageLoopObserver(base::WeakPtr<IOThreadContext> context)
: context_(context) {
base::CurrentThread::Get()->AddDestructionObserver(this);
}
MessageLoopObserver(const MessageLoopObserver&) = delete;
MessageLoopObserver& operator=(const MessageLoopObserver&) = delete;
~MessageLoopObserver() override {
base::CurrentThread::Get()->RemoveDestructionObserver(this);
}
void ShutDown() {
if (!is_active_)
return;
// The call into |context_| below may reenter ShutDown(), hence we set
// |is_active_| to false here.
is_active_ = false;
if (context_)
context_->ShutDownOnIOThread();
delete this;
}
private:
void WillDestroyCurrentMessageLoop() override {
DCHECK(is_active_);
ShutDown();
}
bool is_active_ = true;
base::WeakPtr<IOThreadContext> context_;
};
~IOThreadContext() override {}
void StartOnIOThread() {
// Should bind |io_thread_checker_| to the context's thread.
DCHECK(io_thread_checker_.CalledOnValidThread());
service_receiver_ = std::make_unique<service_manager::ServiceReceiver>(
this, std::move(pending_service_receiver_));
service_receiver_->GetConnector()->BindConnectorReceiver(
std::move(pending_connector_receiver_));
// MessageLoopObserver owns itself.
message_loop_observer_ =
new MessageLoopObserver(weak_factory_.GetWeakPtr());
}
void ShutDownOnIOThread() {
DCHECK(io_thread_checker_.CalledOnValidThread());
weak_factory_.InvalidateWeakPtrs();
// Note that this method may be invoked by MessageLoopObserver observing
// MessageLoop destruction. In that case, this call to ShutDown is
// effectively a no-op. In any case it's safe.
if (message_loop_observer_) {
message_loop_observer_->ShutDown();
message_loop_observer_ = nullptr;
}
// Resetting the ServiceContext below may otherwise release the last
// reference to this IOThreadContext. We keep it alive until the stack
// unwinds.
scoped_refptr<IOThreadContext> keepalive(this);
service_receiver_.reset();
}
/////////////////////////////////////////////////////////////////////////////
// service_manager::Service implementation
void OnBindInterface(const service_manager::BindSourceInfo& source_info,
const std::string& interface_name,
mojo::ScopedMessagePipeHandle interface_pipe) override {}
base::ThreadChecker io_thread_checker_;
bool started_ = false;
// Temporary state established on construction and consumed on the IO thread
// once the connection is started.
mojo::PendingReceiver<service_manager::mojom::Service>
pending_service_receiver_;
scoped_refptr<base::SequencedTaskRunner> io_task_runner_;
mojo::PendingReceiver<service_manager::mojom::Connector>
pending_connector_receiver_;
// TaskRunner on which to run our owner's callbacks, i.e. the ones passed to
// Start().
std::unique_ptr<service_manager::ServiceReceiver> service_receiver_;
// Not owned.
MessageLoopObserver* message_loop_observer_ = nullptr;
base::WeakPtrFactory<IOThreadContext> weak_factory_{this};
};
// static
void ServiceManagerConnection::SetForProcess(
std::unique_ptr<ServiceManagerConnection> connection) {
DCHECK(!GetConnectionForProcess());
GetConnectionForProcess() = std::move(connection);
}
// static
ServiceManagerConnection* ServiceManagerConnection::GetForProcess() {
return GetConnectionForProcess().get();
}
// static
void ServiceManagerConnection::DestroyForProcess() {
// This joins the service manager controller thread.
GetConnectionForProcess().reset();
}
// static
std::unique_ptr<ServiceManagerConnection> ServiceManagerConnection::Create(
mojo::PendingReceiver<service_manager::mojom::Service> receiver,
scoped_refptr<base::SequencedTaskRunner> io_task_runner) {
return std::make_unique<ServiceManagerConnection>(std::move(receiver),
io_task_runner);
}
ServiceManagerConnection::ServiceManagerConnection(
mojo::PendingReceiver<service_manager::mojom::Service> receiver,
scoped_refptr<base::SequencedTaskRunner> io_task_runner) {
mojo::PendingReceiver<service_manager::mojom::Connector> connector_receiver;
connector_ = service_manager::Connector::Create(&connector_receiver);
context_ = new IOThreadContext(std::move(receiver), io_task_runner,
std::move(connector_receiver));
}
ServiceManagerConnection::~ServiceManagerConnection() {
context_->ShutDown();
}
void ServiceManagerConnection::Start() {
context_->Start();
}
service_manager::Connector* ServiceManagerConnection::GetConnector() {
return connector_.get();
}
} // namespace chromecast