// Copyright 2021 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chromecast/cast_core/runtime/browser/runtime_service_impl.h"
#include "base/check.h"
#include "base/command_line.h"
#include "base/functional/callback_helpers.h"
#include "base/logging.h"
#include "base/task/bind_post_task.h"
#include "base/task/sequenced_task_runner.h"
#include "base/time/time.h"
#include "chromecast/browser/cast_web_service.h"
#include "chromecast/cast_core/cast_core_switches.h"
#include "chromecast/cast_core/runtime/browser/core_conversions.h"
#include "chromecast/cast_core/runtime/browser/runtime_application_service_impl.h"
#include "chromecast/metrics/cast_event_builder_simple.h"
#include "components/cast_receiver/browser/public/content_browser_client_mixins.h"
#include "components/cast_receiver/browser/public/embedder_application.h"
#include "third_party/cast_core/public/src/proto/common/application_config.pb.h"
namespace chromecast {
namespace {
constexpr base::TimeDelta kDefaultMetricsReportInterval = base::Seconds(60);
} // namespace
RuntimeServiceImpl::RuntimeServiceImpl(
cast_receiver::ContentBrowserClientMixins& browser_mixins,
CastWebService& web_service)
: application_dispatcher_(
browser_mixins
.CreateApplicationDispatcher<RuntimeApplicationServiceImpl>()),
task_runner_(base::SequencedTaskRunner::GetCurrentDefault()),
web_service_(web_service),
metrics_recorder_(this) {
heartbeat_timer_.SetTaskRunner(task_runner_);
metrics_recorder_service_.emplace(
&metrics_recorder_, &action_recorder_,
base::BindRepeating(&RuntimeServiceImpl::RecordMetrics,
weak_factory_.GetWeakPtr()),
kDefaultMetricsReportInterval);
}
RuntimeServiceImpl::~RuntimeServiceImpl() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
Stop();
}
cast_receiver::Status RuntimeServiceImpl::Start() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
auto* command_line = base::CommandLine::ForCurrentProcess();
std::string runtime_id =
command_line->GetSwitchValueASCII(cast::core::kCastCoreRuntimeIdSwitch);
std::string runtime_service_path =
command_line->GetSwitchValueASCII(cast::core::kRuntimeServicePathSwitch);
return Start(runtime_id, runtime_service_path);
}
cast_receiver::Status RuntimeServiceImpl::Start(
std::string_view runtime_id,
std::string_view runtime_service_endpoint) {
CHECK(!grpc_server_);
CHECK(!runtime_id.empty());
CHECK(!runtime_service_endpoint.empty());
LOG(INFO) << "Starting runtime service: runtime_id=" << runtime_id
<< ", endpoint=" << runtime_service_endpoint;
grpc_server_.emplace();
grpc_server_
->SetHandler<cast::runtime::RuntimeServiceHandler::LoadApplication>(
base::BindPostTask(
task_runner_,
base::BindRepeating(&RuntimeServiceImpl::HandleLoadApplication,
weak_factory_.GetWeakPtr())));
grpc_server_
->SetHandler<cast::runtime::RuntimeServiceHandler::LaunchApplication>(
base::BindPostTask(
task_runner_,
base::BindRepeating(&RuntimeServiceImpl::HandleLaunchApplication,
weak_factory_.GetWeakPtr())));
grpc_server_
->SetHandler<cast::runtime::RuntimeServiceHandler::StopApplication>(
base::BindPostTask(
task_runner_,
base::BindRepeating(&RuntimeServiceImpl::HandleStopApplication,
weak_factory_.GetWeakPtr())));
grpc_server_->SetHandler<cast::runtime::RuntimeServiceHandler::Heartbeat>(
base::BindPostTask(task_runner_, base::BindRepeating(
&RuntimeServiceImpl::HandleHeartbeat,
weak_factory_.GetWeakPtr())));
grpc_server_
->SetHandler<cast::runtime::RuntimeServiceHandler::StartMetricsRecorder>(
base::BindPostTask(
task_runner_, base::BindRepeating(
&RuntimeServiceImpl::HandleStartMetricsRecorder,
weak_factory_.GetWeakPtr())));
grpc_server_
->SetHandler<cast::runtime::RuntimeServiceHandler::StopMetricsRecorder>(
base::BindPostTask(task_runner_,
base::BindRepeating(
&RuntimeServiceImpl::HandleStopMetricsRecorder,
weak_factory_.GetWeakPtr())));
auto status = grpc_server_->Start(std::string(runtime_service_endpoint));
// Browser runtime must crash if the runtime service failed to start to avoid
// the process to dangle without any proper connection to the Cast Core.
CHECK(status.ok()) << "Failed to start runtime service: status="
<< status.error_message();
LOG(INFO) << "Runtime service started";
return cast_receiver::OkStatus();
}
void RuntimeServiceImpl::ResetGrpcServices() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (heartbeat_reactor_) {
heartbeat_timer_.Stop();
// Reset the writes callback as we're not expecting any more responses from
// gRPC framework.
heartbeat_reactor_->SetWritesAvailableCallback(base::DoNothing());
heartbeat_reactor_->Write(grpc::Status::OK);
heartbeat_reactor_ = nullptr;
LOG(INFO) << "Heartbeat reactor is reset";
}
heartbeat_timer_.Stop();
metrics_recorder_stub_.reset();
LOG(INFO) << "Pending apps and Core services terminated";
}
cast_receiver::Status RuntimeServiceImpl::Stop() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
ResetGrpcServices();
if (metrics_recorder_service_) {
metrics_recorder_service_->OnCloseSoon(base::DoNothing());
metrics_recorder_service_.reset();
}
if (grpc_server_) {
grpc_server_->Stop();
grpc_server_.reset();
}
LOG(INFO) << "Runtime service stopped";
return cast_receiver::OkStatus();
}
std::unique_ptr<CastEventBuilder> RuntimeServiceImpl::CreateEventBuilder() {
return std::make_unique<CastEventBuilderSimple>();
}
void RuntimeServiceImpl::HandleLoadApplication(
cast::runtime::LoadApplicationRequest request,
cast::runtime::RuntimeServiceHandler::LoadApplication::Reactor* reactor) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!grpc_server_) {
// gRPC server has been shut down and all reactors have been cancelled.
return;
}
if (request.cast_session_id().empty()) {
LOG(ERROR) << "Session ID is empty";
reactor->Write(grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
"Application session ID is missing"));
return;
}
if (application_dispatcher_->GetApplication(request.cast_session_id())) {
reactor->Write(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION,
"Application already exist"));
return;
}
if (!request.has_application_config()) {
reactor->Write(
grpc::Status(grpc::INVALID_ARGUMENT, "Application config is missing"));
return;
}
LOG(INFO) << "Loading application: session_id=" << request.cast_session_id();
RuntimeApplicationServiceImpl* platform_app =
application_dispatcher_->CreateApplication(
request.cast_session_id(),
ToReceiverConfig(request.application_config()),
base::BindOnce(
[](scoped_refptr<base::SequencedTaskRunner> task_runner,
cast::common::ApplicationConfig config,
CastWebService& web_service,
std::unique_ptr<cast_receiver::RuntimeApplication>
runtime_application) {
return std::make_unique<RuntimeApplicationServiceImpl>(
std::move(runtime_application), std::move(config),
std::move(task_runner), web_service);
},
task_runner_, std::move(request.application_config()),
std::ref(*web_service_)));
platform_app->Load(
request,
base::BindPostTask(
task_runner_,
base::BindOnce(&RuntimeServiceImpl::OnApplicationLoaded,
weak_factory_.GetWeakPtr(), request.cast_session_id(),
std::move(reactor))));
}
void RuntimeServiceImpl::HandleLaunchApplication(
cast::runtime::LaunchApplicationRequest request,
cast::runtime::RuntimeServiceHandler::LaunchApplication::Reactor* reactor) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!grpc_server_) {
// gRPC server has been shut down and all reactors have been cancelled.
return;
}
if (request.cast_session_id().empty()) {
LOG(ERROR) << "Session id is empty";
reactor->Write(grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
"Sesssion id is missing"));
return;
}
RuntimeApplicationServiceImpl* platform_app =
application_dispatcher_->GetApplication(request.cast_session_id());
if (!platform_app) {
LOG(ERROR) << "Application does not exist";
reactor->Write(grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
"Application does not exists"));
return;
}
LOG(INFO) << "Launching application: session_id="
<< request.cast_session_id();
platform_app->Launch(
request,
base::BindPostTask(
task_runner_,
base::BindOnce(&RuntimeServiceImpl::OnApplicationLaunching,
weak_factory_.GetWeakPtr(), request.cast_session_id(),
std::move(reactor))));
}
void RuntimeServiceImpl::HandleStopApplication(
cast::runtime::StopApplicationRequest request,
cast::runtime::RuntimeServiceHandler::StopApplication::Reactor* reactor) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!grpc_server_) {
// gRPC server has been shut down and all reactors have been cancelled.
return;
}
if (request.cast_session_id().empty()) {
LOG(ERROR) << "Session id is missing";
reactor->Write(grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
"Sesssion id is missing"));
return;
}
RuntimeApplicationServiceImpl* platform_app =
application_dispatcher_->GetApplication(request.cast_session_id());
if (!platform_app) {
LOG(ERROR) << "Application doesn't exist anymore: session_id="
<< request.cast_session_id();
reactor->Write(
grpc::Status(grpc::StatusCode::NOT_FOUND, "Application not found"));
return;
}
LOG(INFO) << "Stopping application: session_id=" << request.cast_session_id();
platform_app->Stop(
request, base::BindOnce(&RuntimeServiceImpl::OnApplicationStopping,
weak_factory_.GetWeakPtr(),
request.cast_session_id(), std::move(reactor)));
}
void RuntimeServiceImpl::HandleHeartbeat(
cast::runtime::HeartbeatRequest request,
cast::runtime::RuntimeServiceHandler::Heartbeat::Reactor* reactor) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!grpc_server_) {
// gRPC server has been shut down and all reactors have been cancelled.
return;
}
DCHECK(!heartbeat_reactor_);
if (!request.has_heartbeat_period() ||
request.heartbeat_period().seconds() <= 0) {
LOG(ERROR) << "Failed to create a heartbeat as period is not valid";
reactor->Write(grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
"Incorrect heartbeat period"));
return;
}
if (heartbeat_reactor_) {
LOG(WARNING)
<< "Heartbeats are requested with active heartbeat reactor: reactor="
<< heartbeat_reactor_;
heartbeat_reactor_->Write(
grpc::Status(grpc::StatusCode::ABORTED, "Duplicate heartbeat aborted"));
}
heartbeat_period_ = base::Seconds(request.heartbeat_period().seconds());
heartbeat_reactor_ = reactor;
// Set the write callback once for all future calls from gRPC framework.
heartbeat_reactor_->SetWritesAvailableCallback(base::BindPostTask(
task_runner_, base::BindRepeating(&RuntimeServiceImpl::OnHeartbeatSent,
weak_factory_.GetWeakPtr())));
LOG(INFO) << "Starting heartbeat: reactor=" << heartbeat_reactor_
<< ", period=" << heartbeat_period_;
SendHeartbeat();
}
void RuntimeServiceImpl::HandleStartMetricsRecorder(
cast::runtime::StartMetricsRecorderRequest request,
cast::runtime::RuntimeServiceHandler::StartMetricsRecorder::Reactor*
reactor) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!grpc_server_) {
// gRPC server has been shut down and all reactors have been cancelled.
return;
}
if (request.metrics_recorder_service_info().grpc_endpoint().empty()) {
reactor->Write(grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
"MetricsRecord service endpoint is missing"));
return;
}
LOG(INFO) << "Started recording metrics";
metrics_recorder_stub_.emplace(
request.metrics_recorder_service_info().grpc_endpoint());
reactor->Write(cast::runtime::StartMetricsRecorderResponse());
}
void RuntimeServiceImpl::HandleStopMetricsRecorder(
cast::runtime::StopMetricsRecorderRequest request,
cast::runtime::RuntimeServiceHandler::StopMetricsRecorder::Reactor*
reactor) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!grpc_server_) {
// gRPC server has been shut down and all reactors have been cancelled.
return;
}
if (!metrics_recorder_service_) {
LOG(ERROR) << "Droping metrics recorging stop request as service is not "
"available anymore";
reactor->Write(grpc::Status(grpc::StatusCode::UNAVAILABLE,
"Metrics recording service is not available"));
return;
}
LOG(INFO) << "Stopped recording metrics";
// Just reset the MetricsRecorder stub to stop sending metrics to Core.
metrics_recorder_stub_.reset();
reactor->Write(cast::runtime::StopMetricsRecorderResponse());
}
void RuntimeServiceImpl::OnApplicationLoaded(
std::string session_id,
cast::runtime::RuntimeServiceHandler::LoadApplication::Reactor* reactor,
cast_receiver::Status status) {
auto* platform_app = application_dispatcher_->GetApplication(session_id);
if (!platform_app) {
LOG(ERROR) << "Application doesn't exist anymore: session_id="
<< session_id;
reactor->Write(
grpc::Status(grpc::StatusCode::NOT_FOUND, "Application not found"));
return;
}
if (!status.ok()) {
LOG(ERROR) << "Failed to load application: session_id=" << session_id
<< ", status=" << status;
platform_app->Stop(cast::runtime::StopApplicationRequest(),
base::DoNothing());
application_dispatcher_->DestroyApplication(session_id);
reactor->Write(grpc::Status(static_cast<grpc::StatusCode>(status.code()),
std::string(status.message())));
return;
}
cast::runtime::LoadApplicationResponse response;
response.mutable_message_port_info();
reactor->Write(std::move(response));
}
void RuntimeServiceImpl::OnApplicationLaunching(
std::string session_id,
cast::runtime::RuntimeServiceHandler::LaunchApplication::Reactor* reactor,
cast_receiver::Status status) {
auto* platform_app = application_dispatcher_->GetApplication(session_id);
if (!platform_app) {
LOG(ERROR) << "Application doesn't exist anymore: session_id="
<< session_id;
reactor->Write(
grpc::Status(grpc::StatusCode::NOT_FOUND, "Application not found"));
return;
}
if (!status.ok()) {
LOG(ERROR) << "Failed to launch application: session_id=" << session_id
<< ", status=" << status;
platform_app->Stop(cast::runtime::StopApplicationRequest(),
base::DoNothing());
application_dispatcher_->DestroyApplication(session_id);
reactor->Write(grpc::Status(static_cast<grpc::StatusCode>(status.code()),
std::string(status.message())));
return;
}
reactor->Write(cast::runtime::LaunchApplicationResponse());
}
void RuntimeServiceImpl::OnApplicationStopping(
std::string session_id,
cast::runtime::RuntimeServiceHandler::StopApplication::Reactor* reactor,
cast_receiver::Status status) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
auto platform_app = application_dispatcher_->DestroyApplication(session_id);
DCHECK(platform_app);
// Reset the app only after the response is constructed.
cast::runtime::StopApplicationResponse response;
response.set_app_id(platform_app->app_id());
response.set_cast_session_id(session_id);
reactor->Write(std::move(response));
}
void RuntimeServiceImpl::SendHeartbeat() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!heartbeat_reactor_) {
LOG(WARNING) << "Heartbeat reactor has been destroyed";
return;
}
DVLOG(2) << "Sending heartbeat";
heartbeat_reactor_->Write(cast::runtime::HeartbeatResponse());
}
void RuntimeServiceImpl::OnHeartbeatSent(
grpc::Status status,
cast::runtime::RuntimeServiceHandler::Heartbeat::Reactor* reactor) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// There might be duplicate SendHeartbeat requests from Cast Core. The ones
// that are not associated with |heartbeat_reactor_| must be ignored.
if (reactor != heartbeat_reactor_) {
// The |reactor| was cancelled as the heartbeat response was scheduled
// for send.
LOG(WARNING) << "Ignoring heartbeat from previous request";
return;
}
if (!status.ok()) {
// Runtime failed to send the heartbeat to Core - assume Core has crashed
// and reset.
LOG(ERROR) << "Failed to send heartbeats: "
<< cast::utils::GrpcStatusToString(status);
heartbeat_reactor_ = nullptr;
ResetGrpcServices();
return;
}
// Everything is ok - schedule another heartbeat.
heartbeat_timer_.Start(
FROM_HERE, heartbeat_period_,
base::BindPostTask(task_runner_,
base::BindOnce(&RuntimeServiceImpl::SendHeartbeat,
weak_factory_.GetWeakPtr())));
}
void RuntimeServiceImpl::RecordMetrics(
cast::metrics::RecordRequest request,
CastRuntimeMetricsRecorderService::RecordCompleteCallback
record_complete_callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!metrics_recorder_stub_) {
std::move(record_complete_callback).Run();
return;
}
auto call =
metrics_recorder_stub_
->CreateCall<cast::metrics::MetricsRecorderServiceStub::Record>(
std::move(request));
std::move(call).InvokeAsync(base::BindPostTask(
task_runner_, base::BindOnce(&RuntimeServiceImpl::OnMetricsRecorded,
weak_factory_.GetWeakPtr(),
std::move(record_complete_callback))));
}
void RuntimeServiceImpl::OnMetricsRecorded(
CastRuntimeMetricsRecorderService::RecordCompleteCallback
record_complete_callback,
cast::utils::GrpcStatusOr<cast::metrics::RecordResponse> response_or) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!response_or.ok()) {
LOG(ERROR) << "Failed to record metrics: " << response_or.ToString();
}
std::move(record_complete_callback).Run();
}
void RuntimeServiceImpl::OnMetricsRecorderServiceStopped(
cast::runtime::RuntimeServiceHandler::StopMetricsRecorder::Reactor*
reactor) {
DVLOG(2) << "MetricsRecorderService stopped";
metrics_recorder_service_.reset();
metrics_recorder_stub_.reset();
reactor->Write(cast::runtime::StopMetricsRecorderResponse());
}
} // namespace chromecast