// Copyright 2024 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/ash/chromebox_for_meetings/artemis/data_aggregator_service.h"
#include "base/task/thread_pool.h"
#include "base/time/time.h"
#include "chrome/browser/ash/chromebox_for_meetings/artemis/log_source.h"
#include "chrome/browser/ash/chromebox_for_meetings/artemis/persistent_db.h"
#include "chrome/browser/ash/chromebox_for_meetings/artemis/specialized_log_sources.h"
#include "chromeos/ash/components/dbus/chromebox_for_meetings/cfm_hotline_client.h"
#include "mojo/public/cpp/bindings/self_owned_receiver.h"
namespace ash::cfm {
namespace {
// Local convenience aliases
using mojom::DataFilter::FilterType::CHANGE;
using mojom::DataFilter::FilterType::REGEX;
static DataAggregatorService* g_data_aggregator_service = nullptr;
constexpr base::TimeDelta kFetchFrequency = base::Minutes(1);
constexpr size_t kDefaultLogBatchSize = 500; // lines
constexpr size_t kPayloadMaxSizeBytes = 500000; // 500Kb
constexpr base::TimeDelta kPayloadEnqueueTimeout = base::Minutes(10);
constexpr size_t kMaxPayloadQueueSize = 3; // # payloads
constexpr base::TimeDelta kServiceAdaptorRetryDelay = base::Seconds(1);
constexpr size_t kServiceAdaptorRetryMaxTries = 5;
constexpr net::BackoffEntry::Policy kEnqueueRetryBackoffPolicy = {
0, // Number of initial errors to ignore.
1000, // Initial delay in ms.
2.0, // Factor by which the waiting time will be multiplied.
0.2, // Fuzzing percentage.
60 * 1000 * 5, // Maximum delay in ms.
-1, // Never discard the entry.
true, // Use initial delay.
};
// List of commands that should be polled frequently. Any commands
// being watched by watchdogs should be here.
constexpr base::TimeDelta kDefaultCommandPollFrequency = base::Seconds(5);
const char* kLocalCommandSourcesFastPoll[] = {
"ip -brief address",
"lspci",
"lsusb -t",
};
// List of commands that should be polled at a much slower frequency
// than the default. These are strictly for telemetry purposes in
// cloud logging and should be reserved for commands that don't need
// constant monitoring. Commands that are watched by a watchdog should
// NOT be in this list.
constexpr base::TimeDelta kExtendedCommandPollFrequency = base::Minutes(1);
const char* kLocalCommandSourcesSlowPoll[] = {
"df -h",
"free -m",
// Hide kernelspace processes and show limited columns.
"ps -o pid,user,group,args --ppid 2 -p 2 -N --sort=pid",
};
constexpr base::TimeDelta kDefaultLogPollFrequency = base::Seconds(10);
const char* kLocalLogSources[] = {
kCfmAuditLogFile, kCfmBiosInfoLogFile, kCfmChromeLogFile,
kCfmCrosEcLogFile, kCfmEventlogLogFile, kCfmFwupdLogFile,
kCfmLacrosLogFile, kCfmPowerdLogFile, kCfmSyslogLogFile,
kCfmUiLogFile, kCfmUpdateEngineLogFile, kCfmVariationsListLogFile,
};
} // namespace
// static
void DataAggregatorService::Initialize() {
CHECK(!g_data_aggregator_service);
g_data_aggregator_service = new DataAggregatorService();
}
// static
void DataAggregatorService::InitializeForTesting(
DataAggregatorService* data_aggregator_service) {
CHECK(!g_data_aggregator_service);
g_data_aggregator_service = data_aggregator_service;
}
// static
void DataAggregatorService::Shutdown() {
CHECK(g_data_aggregator_service);
delete g_data_aggregator_service;
g_data_aggregator_service = nullptr;
}
// static
DataAggregatorService* DataAggregatorService::Get() {
CHECK(g_data_aggregator_service)
<< "DataAggregatorService::Get() called before Initialize()";
return g_data_aggregator_service;
}
// static
bool DataAggregatorService::IsInitialized() {
return g_data_aggregator_service;
}
bool DataAggregatorService::ServiceRequestReceived(
const std::string& interface_name) {
if (interface_name != mojom::DataAggregator::Name_) {
return false;
}
service_adaptor_.BindServiceAdaptor();
return true;
}
void DataAggregatorService::OnAdaptorDisconnect() {
LOG(ERROR) << "mojom::DataAggregator Service Adaptor has been disconnected";
// CleanUp to follow the lifecycle of the primary CfmServiceContext
receivers_.Clear();
}
void DataAggregatorService::OnBindService(
mojo::ScopedMessagePipeHandle receiver_pipe) {
receivers_.Add(this, mojo::PendingReceiver<mojom::DataAggregator>(
std::move(receiver_pipe)));
}
void DataAggregatorService::GetDataSourceNames(
GetDataSourceNamesCallback callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
std::vector<std::string> source_names;
for (auto const& data_source : data_source_map_) {
source_names.push_back(data_source.first);
}
std::move(callback).Run(std::move(source_names));
}
void DataAggregatorService::AddDataSource(
const std::string& source_name,
mojo::PendingRemote<mojom::DataSource> new_data_source,
AddDataSourceCallback callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (data_source_map_.count(source_name) != 0) {
LOG(ERROR) << "Attempted to add source name " << source_name
<< " more than once. Disregarding this one.";
std::move(callback).Run(false /* success */);
return;
}
mojo::Remote<mojom::DataSource> data_source(std::move(new_data_source));
data_source_map_[source_name] = std::move(data_source);
std::move(callback).Run(true /* success */);
}
void DataAggregatorService::AddWatchDog(
const std::string& source_name,
mojom::DataFilterPtr filter,
mojo::PendingRemote<mojom::DataWatchDog> watch_dog,
AddWatchDogCallback callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// TODO(b/326440932): add an enum for "watchable" data sources
// and deny requests that are outside of this list.
if (data_source_map_.count(source_name) != 0) {
LOG(WARNING) << "Attempted to add a watchdog to a non-existent source: "
<< source_name;
std::move(callback).Run(false /* success */);
return;
}
// Pass the callback through to the data source and run it there.
data_source_map_[source_name]->AddWatchDog(
std::move(filter), std::move(watch_dog), std::move(callback));
}
void DataAggregatorService::AddLocalCommandSource(
const std::string& command,
const base::TimeDelta& poll_freq) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
CHECK(data_source_map_.count(command) == 0)
<< "Local command '" << command << "' was added twice.";
mojo::Remote<mojom::DataSource> remote;
local_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(
[](mojo::PendingReceiver<mojom::DataSource> pending_receiver,
const std::string& device_id, const std::string& command,
const base::TimeDelta& poll_freq) {
auto source = std::make_unique<CommandSource>(command, poll_freq);
source->AssignDeviceID(device_id);
source->StartCollectingData();
mojo::MakeSelfOwnedReceiver(std::move(source),
std::move(pending_receiver));
},
remote.BindNewPipeAndPassReceiver(),
active_transport_payload_.permanent_id(), command, poll_freq));
remote.set_disconnect_handler(
base::BindOnce(&DataAggregatorService::OnLocalCommandDisconnect,
base::Unretained(this), command, poll_freq));
data_source_map_[command] = std::move(remote);
}
void DataAggregatorService::OnLocalCommandDisconnect(
const std::string& command,
const base::TimeDelta& poll_freq) {
// This is unlikely, but if one of our local remotes disconnects,
// just request to re-add it. The pointers in our local maps will
// be overridden, and the old objects will be destroyed.
LOG(WARNING) << "Local DataSource for '" << command << "' has disconnected; "
<< "attempting to reconnect.";
data_source_map_.erase(command);
AddLocalCommandSource(command, poll_freq);
}
void DataAggregatorService::AddLocalLogSource(const std::string& filepath) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
CHECK(data_source_map_.count(filepath) == 0)
<< "Local log file '" << filepath << "' was added twice.";
mojo::Remote<mojom::DataSource> remote;
local_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(
[](mojo::PendingReceiver<mojom::DataSource> pending_receiver,
const std::string& device_id, const std::string& filepath) {
auto source = LogSource::Create(filepath, kDefaultLogPollFrequency,
kDefaultLogBatchSize);
source->AssignDeviceID(device_id);
source->StartCollectingData();
mojo::MakeSelfOwnedReceiver(std::move(source),
std::move(pending_receiver));
},
remote.BindNewPipeAndPassReceiver(),
active_transport_payload_.permanent_id(), filepath));
remote.set_disconnect_handler(
base::BindOnce(&DataAggregatorService::OnLocalLogDisconnect,
base::Unretained(this), filepath));
data_source_map_[filepath] = std::move(remote);
}
void DataAggregatorService::OnLocalLogDisconnect(const std::string& filepath) {
// This is unlikely, but if one of our local remotes disconnects,
// just request to re-add it. The pointers in our local maps will
// be overridden, and the old objects will be destroyed.
LOG(WARNING) << "Local DataSource for '" << filepath << "' has disconnected; "
<< "attempting to reconnect.";
data_source_map_.erase(filepath);
AddLocalLogSource(filepath);
}
void DataAggregatorService::OnMojoDisconnect() {
VLOG(3) << "mojom::DataAggregator disconnected";
}
void DataAggregatorService::InitializeLocalSources() {
// Add local command sources
for (auto* const cmd : kLocalCommandSourcesFastPoll) {
VLOG(1) << "Adding command '" << cmd << "' to sources.";
AddLocalCommandSource(cmd, kDefaultCommandPollFrequency);
}
for (auto* const cmd : kLocalCommandSourcesSlowPoll) {
VLOG(1) << "Adding command '" << cmd << "' to local sources.";
AddLocalCommandSource(cmd, kExtendedCommandPollFrequency);
}
// Add local log file sources
for (auto* const logfile : kLocalLogSources) {
VLOG(1) << "Adding log file '" << logfile << "' to local sources.";
AddLocalLogSource(logfile);
}
}
void DataAggregatorService::InitializeUploadEndpoint(size_t num_tries) {
// Hook into the existing CfmLoggerService.
const std::string kMeetDevicesLoggerInterfaceName =
chromeos::cfm::mojom::MeetDevicesLogger::Name_;
// We'll only be bound if we tried to initialize the endpoint
// already and failed. Just reset and try again.
if (uploader_remote_.is_bound()) {
uploader_remote_.reset();
}
service_adaptor_.GetService(
kMeetDevicesLoggerInterfaceName,
uploader_remote_.BindNewPipeAndPassReceiver().PassPipe(),
base::BindOnce(&DataAggregatorService::OnRequestBindUploadService,
weak_ptr_factory_.GetWeakPtr(),
kMeetDevicesLoggerInterfaceName, num_tries));
}
void DataAggregatorService::OnRequestBindUploadService(
const std::string& interface_name,
size_t num_tries,
bool success) {
VLOG(3) << "Uploader RequestBindService result: " << success
<< " for interface: " << interface_name;
if (success) {
last_upload_time_ = base::TimeTicks::Now();
InitializeDeviceInfoEndpoint(/*num_tries=*/0);
return;
}
if (num_tries >= kServiceAdaptorRetryMaxTries) {
LOG(ERROR) << "Retry limit reached for connecting to " << interface_name
<< ". Remote calls will fail.";
return;
}
VLOG(3) << "Retrying service adaptor connection in "
<< kServiceAdaptorRetryDelay;
base::SequencedTaskRunner::GetCurrentDefault()->PostDelayedTask(
FROM_HERE,
base::BindOnce(&DataAggregatorService::InitializeUploadEndpoint,
weak_ptr_factory_.GetWeakPtr(), num_tries + 1),
kServiceAdaptorRetryDelay);
}
void DataAggregatorService::InitializeDeviceInfoEndpoint(size_t num_tries) {
// Hook into the existing CfmDeviceInfoService.
const std::string kMeetDevicesInfoInterfaceName =
chromeos::cfm::mojom::MeetDevicesInfo::Name_;
// We'll only be bound if we tried to initialize the endpoint
// already and failed. Just reset and try again.
if (device_info_remote_.is_bound()) {
device_info_remote_.reset();
}
service_adaptor_.GetService(
kMeetDevicesInfoInterfaceName,
device_info_remote_.BindNewPipeAndPassReceiver().PassPipe(),
base::BindOnce(&DataAggregatorService::OnRequestBindDeviceInfoService,
weak_ptr_factory_.GetWeakPtr(),
kMeetDevicesInfoInterfaceName, num_tries));
}
void DataAggregatorService::OnRequestBindDeviceInfoService(
const std::string& interface_name,
size_t num_tries,
bool success) {
VLOG(3) << "DeviceInfo RequestBindService result: " << success
<< " for interface: " << interface_name;
if (success) {
RequestDeviceId();
return;
}
if (num_tries >= kServiceAdaptorRetryMaxTries) {
LOG(ERROR) << "Retry limit reached for connecting to " << interface_name
<< ". Remote calls will fail.";
return;
}
VLOG(3) << "Retrying service adaptor connection in "
<< kServiceAdaptorRetryDelay;
base::SequencedTaskRunner::GetCurrentDefault()->PostDelayedTask(
FROM_HERE,
base::BindOnce(&DataAggregatorService::InitializeDeviceInfoEndpoint,
weak_ptr_factory_.GetWeakPtr(), num_tries + 1),
kServiceAdaptorRetryDelay);
}
void DataAggregatorService::RequestDeviceId() {
device_info_remote_->GetPolicyInfo(base::BindOnce(
&DataAggregatorService::StoreDeviceId, weak_ptr_factory_.GetWeakPtr()));
}
void DataAggregatorService::StoreDeviceId(
chromeos::cfm::mojom::PolicyInfoPtr policy_info) {
// Only start collecting data if we have a device_id. Without a proper
// ID, we can't upload logs to cloud logging, so the data is useless.
if (policy_info->device_id.has_value()) {
active_transport_payload_.set_permanent_id(policy_info->device_id.value());
VLOG(4) << "Assigning device ID " << policy_info->device_id.value();
InitializeLocalSources();
StartFetchTimer();
} else {
LOG(ERROR)
<< "Unable to determine device ID! Cloud logging will be disabled.";
}
}
void DataAggregatorService::StartFetchTimer() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
VLOG(1) << "Artemis started. Listening for data.";
fetch_timer_.Start(
FROM_HERE, kFetchFrequency,
base::BindRepeating(&DataAggregatorService::FetchFromAllSourcesAndEnqueue,
weak_ptr_factory_.GetWeakPtr()));
}
void DataAggregatorService::FetchFromAllSourcesAndEnqueue() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// Wait for enqueue callback to fire before fetching more data.
if (enqueue_in_progress_) {
return;
}
VLOG(1) << "Fetching data from " << data_source_map_.size() << " sources.";
for (const auto& data_source : data_source_map_) {
std::string source_name = data_source.first;
const auto& source_remote = data_source.second;
auto append_callback =
base::BindOnce(&DataAggregatorService::AppendEntriesToActivePayload,
weak_ptr_factory_.GetWeakPtr(), std::move(source_name));
source_remote->Fetch(std::move(append_callback));
}
}
void DataAggregatorService::AppendEntriesToActivePayload(
const std::string& source_name,
const std::vector<std::string>& serialized_entries) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (serialized_entries.empty()) {
return;
}
if (VLOG_IS_ON(4)) {
VLOG(4) << "Appending the following entries: ";
for (auto& entry : serialized_entries) {
VLOG(4) << entry;
}
}
// TODO(b/336777241): use different payloads for different source types.
// Using LogPayload for everything at this time.
proto::LogPayload* log_payload =
active_transport_payload_.mutable_log_payload();
proto::LogSet* log_set = nullptr;
// First check if we already have a LogSet for this data source.
for (auto& set : *(log_payload->mutable_log_sets())) {
if (set.log_source() == source_name) {
log_set = &set;
break;
}
}
// If the current payload doesn't contain a LogSet for this data
// source yet, create a new one.
if (log_set == nullptr) {
log_set = log_payload->add_log_sets();
}
google::protobuf::RepeatedPtrField<proto::LogEntry>* entries =
log_set->mutable_entries();
log_set->set_log_source(source_name);
// Deserialize the entries back into protos and append them to the payload.
for (const auto& entry_str : serialized_entries) {
proto::LogEntry entry;
if (!entry.ParseFromString(entry_str)) {
LOG(WARNING) << "Unable to parse entry. Dropping '" << entry_str << "'";
} else {
entries->Add(std::move(entry));
}
}
if (IsPayloadReadyForUpload()) {
VLOG(1) << "Payload is ready to be enqueued. Pushing to wire.";
AddActivePayloadToPendingQueue();
EnqueueNextPendingTransportPayload();
}
}
bool DataAggregatorService::IsPayloadReadyForUpload() const {
// Flush the payload to the wire if it exceeds our max size.
if (active_transport_payload_.ByteSizeLong() >= kPayloadMaxSizeBytes) {
VLOG(4) << "Payload reached maximum size; pushing to wire";
return true;
}
// Use a timeout to force flush to the wire. This ensures that we're
// always uploading data, even in the event of a data "stall", where
// a small amount of data is available for an extended period of time.
if ((base::TimeTicks::Now() - last_upload_time_) >= kPayloadEnqueueTimeout) {
VLOG(4) << "Payload timeout reached; force pushing";
return true;
}
return false;
}
void DataAggregatorService::AddActivePayloadToPendingQueue() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
auto timestamp =
(base::Time::Now() - base::Time::UnixEpoch()).InMilliseconds();
// We want to take the necessary data from the active payload and
// create a new (equivalent) payload that we can push to our queue.
// To avoid a large deep copy, steal the log pointer from the active
// payload and reassign it to the new one.
proto::TransportPayload pending_payload;
pending_payload.set_permanent_id(active_transport_payload_.permanent_id());
pending_payload.set_collection_timestamp_ms(timestamp);
proto::LogPayload* curr_active_payload =
active_transport_payload_.release_log_payload();
pending_payload.set_allocated_log_payload(curr_active_payload);
pending_transport_payloads_.push(std::move(pending_payload));
// Drop front element if queue grows too large.
if (pending_transport_payloads_.size() > kMaxPayloadQueueSize) {
LOG(WARNING) << "Payload queue grew too large. Dropping oldest.";
pending_transport_payloads_.pop();
}
VLOG(3) << "Pushed payload into pending queue. New size: "
<< pending_transport_payloads_.size();
}
void DataAggregatorService::EnqueueNextPendingTransportPayload() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (pending_transport_payloads_.empty()) {
LOG(WARNING) << "Requested payload enqueue, but payload queue is empty.";
return;
}
auto enqueue_success_callback =
base::BindOnce(&DataAggregatorService::HandleEnqueueResponse,
weak_ptr_factory_.GetWeakPtr());
enqueue_in_progress_ = true;
// TODO(b/339455254): have each data source specify a priority instead
// of assuming kLow for every enqueue.
uploader_remote_->Enqueue(
pending_transport_payloads_.front().SerializeAsString(),
chromeos::cfm::mojom::EnqueuePriority::kLow,
std::move(enqueue_success_callback));
}
void DataAggregatorService::HandleEnqueueResponse(
chromeos::cfm::mojom::LoggerStatusPtr status) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (status->code != chromeos::cfm::mojom::LoggerErrorCode::kOk) {
enqueue_retry_backoff_.InformOfRequest(/*succeeded=*/false);
auto retry_delay = enqueue_retry_backoff_.GetTimeUntilRelease();
LOG(ERROR) << "Recent enqueue failed with error code: " << status->code
<< ". Trying again in " << retry_delay;
base::SequencedTaskRunner::GetCurrentDefault()->PostDelayedTask(
FROM_HERE,
base::BindOnce(
&DataAggregatorService::EnqueueNextPendingTransportPayload,
weak_ptr_factory_.GetWeakPtr()),
retry_delay);
return;
}
VLOG(1) << "Recent enqueue succeeded.";
enqueue_retry_backoff_.Reset();
// If the enqueue succeeded, Flush() all of the affected data sources so they
// can update their internal pointers. Note that for non-incremental sources
// this will likely just be a no-op.
proto::LogPayload* log_payload =
pending_transport_payloads_.front().mutable_log_payload();
google::protobuf::RepeatedPtrField<proto::LogSet>* log_sets =
log_payload->mutable_log_sets();
for (const auto& log_set : *log_sets) {
const auto& data_source = log_set.log_source();
data_source_map_[data_source]->Flush();
}
// Clean up.
enqueue_in_progress_ = false;
last_upload_time_ = base::TimeTicks::Now();
pending_transport_payloads_.pop();
// Try another transfer if the queue is still populated.
if (!pending_transport_payloads_.empty()) {
VLOG(2) << "More payloads in queue; enqueueing.";
EnqueueNextPendingTransportPayload();
}
}
DataAggregatorService::DataAggregatorService()
: service_adaptor_(mojom::DataAggregator::Name_, this),
enqueue_retry_backoff_(&kEnqueueRetryBackoffPolicy) {
CfmHotlineClient::Get()->AddObserver(this);
DETACH_FROM_SEQUENCE(sequence_checker_);
receivers_.set_disconnect_handler(base::BindRepeating(
&DataAggregatorService::OnMojoDisconnect, base::Unretained(this)));
local_task_runner_ =
base::ThreadPool::CreateSequencedTaskRunner({base::MayBlock()});
local_task_runner_->PostTask(FROM_HERE,
base::BindOnce(&PersistentDb::Initialize));
VLOG(1) << "Starting Artemis...";
InitializeUploadEndpoint(/*num_tries=*/0);
}
DataAggregatorService::~DataAggregatorService() {
local_task_runner_->PostTask(FROM_HERE,
base::BindOnce(&PersistentDb::Shutdown));
CfmHotlineClient::Get()->RemoveObserver(this);
}
} // namespace ash::cfm