// Copyright 2024 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/metrics/structured/storage_manager_impl.h"
#include "base/system/sys_info.h"
#include "base/task/bind_post_task.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool.h"
#include "base/types/expected.h"
#include "chrome/browser/metrics/structured/arena_event_buffer.h"
#include "chrome/browser/metrics/structured/event_logging_features.h"
#include "components/metrics/structured/histogram_util.h"
#include "components/metrics/structured/structured_metrics_features.h"
namespace metrics::structured {
namespace {
using ::google::protobuf::Arena;
using ::google::protobuf::RepeatedPtrField;
// Default paths for Storage Manager on ChromeOS.
constexpr char kArenaProtoDefaultPath[] =
"/var/lib/metrics/structured/chromium/storage/initial-events";
constexpr char kFlushedEventsDefaultDir[] =
"/var/lib/metrics/structured/chromium/storage/flushed";
// Path of the partition used to store flushed events.
constexpr char kRootPartitionPath[] = "/var/lib/metrics/structured/";
// These minimum values are just guesses on what would be decent for low end
// devices.
constexpr int64_t kMinBufferSize = 10 * 1024; // 10 kb
constexpr int64_t kMinDiskSize = 50 * 1024; // 50 kb
} // namespace
StorageManagerImpl::StorageManagerImpl(const StorageManagerConfig& config)
: StorageManagerImpl(config,
base::FilePath(kArenaProtoDefaultPath),
base::FilePath(kFlushedEventsDefaultDir)) {}
StorageManagerImpl::StorageManagerImpl(const StorageManagerConfig& config,
const base::FilePath& events_path,
const base::FilePath& flush_dir)
: config_(config),
flushed_map_(flush_dir, config_.disk_max_bytes),
task_runner_(base::ThreadPool::CreateSequencedTaskRunner(
{base::TaskPriority::BEST_EFFORT, base::MayBlock(),
base::TaskShutdownBehavior::SKIP_ON_SHUTDOWN})) {
LogMaxMemorySizeKb(config_.buffer_max_bytes / 1024);
LogMaxDiskSizeKb(config_.disk_max_bytes / 1024);
event_buffer_ = std::make_unique<ArenaEventBuffer>(
events_path,
/*write_delay=*/base::Minutes(0), config_.buffer_max_bytes);
}
StorageManagerImpl::~StorageManagerImpl() = default;
void StorageManagerImpl::AddEvent(StructuredEventProto event) {
const Result result = event_buffer_->AddEvent(event);
// By default we assume it was successful, it is only an error if the result
// is an kError.
RecordStatus status;
switch (result) {
case Result::kOk:
// Event added successfully.
status = RecordStatus::kOk;
break;
case Result::kShouldFlush:
// Flush the buffer.
FlushBuffer();
status = RecordStatus::kFlushed;
break;
case Result::kFull:
// Flush the buffer then add event again.
FlushAndAddEvent(std::move(event));
status = RecordStatus::kFull;
break;
case Result::kError:
status = RecordStatus::kError;
break;
}
LogStorageManagerRecordStatus(status);
}
// Reads events from disk then from in-memory.
//
// This is a blocking operation since it could be reading events from disk. It
// would be best to put this in a task.
RepeatedPtrField<StructuredEventProto> StorageManagerImpl::TakeEvents() {
RepeatedPtrField<StructuredEventProto> events;
if (event_buffer_->Size() != 0) {
TakeFromInMemory(&events);
}
if (!flushed_map_.empty()) {
TakeFromDisk(&events);
}
return events;
}
// The implementation only says how many events are in-memory or if there are
// events storage on-disk.
//
// It would be to expensive to get the number of on-disk events to have an
// accurate value. I.E, if there are no events in-memory but there are some on
// disk, this still return > 0.
int StorageManagerImpl::RecordedEventsCount() const {
uint64_t size = event_buffer_->Size();
return size ? size : flushed_map_.keys().size();
}
void StorageManagerImpl::Purge() {
event_buffer_->Purge();
flushed_map_.Purge();
}
void StorageManagerImpl::AddBatchEvents(
const RepeatedPtrField<StructuredEventProto>& events) {
for (const auto& event : events) {
AddEvent(event);
}
}
// static
StorageManagerConfig StorageManagerImpl::GetStorageManagerConfig() {
int64_t free_disk_space =
base::SysInfo::AmountOfFreeDiskSpace(base::FilePath(kRootPartitionPath));
if (free_disk_space == -1) {
free_disk_space = 0;
}
free_disk_space = GetMaxDiskSizeRatio() * free_disk_space;
int64_t buffer_max_size =
base::SysInfo::AmountOfPhysicalMemory() * GetMaxBufferSizeRatio();
return StorageManagerConfig{
.buffer_max_bytes = std::max(buffer_max_size, kMinBufferSize),
.disk_max_bytes = std::max(free_disk_space, kMinDiskSize),
};
}
void StorageManagerImpl::FlushAndAddEvent(StructuredEventProto&& event) {
FlushBuffer();
// Buffer should be cleared by this point.
event_buffer_->AddEvent(event);
}
void StorageManagerImpl::FlushBuffer() {
// The buffer is written to disk asynchronously, but it is cleared
// synchonously and can be written to once this function returns.
flushed_map_.Flush(
*event_buffer_,
base::BindPostTask(task_runner_,
base::BindOnce(&StorageManagerImpl::OnFlushCompleted,
weak_factory_.GetWeakPtr())));
}
void StorageManagerImpl::OnFlushCompleted(
base::expected<FlushedKey, FlushError> key) {
// If |key| has a value then the events were successfully written to disk.
// Otherwise, there was an error while preparing the events or while writing
// the file.
if (key.has_value()) {
LogStorageManagerFlushStatus(StorageManagerFlushStatus::kSuccessful);
NotifyOnFlushed(*key);
return;
}
switch (key.error()) {
case FlushError::kQuotaExceeded:
LogStorageManagerFlushStatus(StorageManagerFlushStatus::kQuotaExceeded);
// The file is already flushed, just cleanup until we are under quota.
if (dropping_flushed_queued_.load()) {
break;
}
dropping_flushed_queued_.store(true);
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&StorageManagerImpl::DropFlushedUntilUnderQuota,
weak_factory_.GetWeakPtr()));
break;
// The write failed and we are unable to recover. The events that were being
// written are unrecoverable and are lost.
// TODO(b/342008451): Recover lost events when writing flushed events fails.
case FlushError::kDiskFull:
LogStorageManagerFlushStatus(StorageManagerFlushStatus::kDiskFull);
break;
case FlushError::kWriteError:
LogStorageManagerFlushStatus(StorageManagerFlushStatus::kWriteError);
break;
case FlushError::kSerializationFailed:
LogStorageManagerFlushStatus(
StorageManagerFlushStatus::kEventSerializationError);
break;
}
}
void StorageManagerImpl::TakeFromInMemory(
RepeatedPtrField<StructuredEventProto>* output) {
LogInMemoryEventsAtUpload(event_buffer_->Size());
// Copy the events out of the buffer. We have to copy here because the events
// stored in |events_buffer_| are allocated on an arena.
output->MergeFrom(event_buffer_->Serialize());
// Clear in-memory events and update the on-disk backup.
event_buffer_->Purge();
}
void StorageManagerImpl::TakeFromDisk(
RepeatedPtrField<StructuredEventProto>* output) {
LogFlushedBuffersAtUpload(flushed_map_.keys().size());
for (const auto& key : flushed_map_.keys()) {
std::optional<EventsProto> events = flushed_map_.ReadKey(key);
if (!events.has_value()) {
continue;
}
// This is to avoid a deep copy of the |events| when using MergeFrom. This
// should be more efficient despite the additional allocation.
std::vector<StructuredEventProto*> elements(events->events_size(), nullptr);
output->Reserve(output->size() + events->events_size());
events->mutable_events()->ExtractSubrange(0, events->events_size(),
elements.data());
for (auto* event : elements) {
output->AddAllocated(event);
}
}
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&StorageManagerImpl::CleanupFlushed,
weak_factory_.GetWeakPtr(), DeleteReason::kUploaded));
}
void StorageManagerImpl::CleanupFlushed(DeleteReason reason) {
// Create copy of flushed keys no notify observers.
std::vector<FlushedKey> keys = flushed_map_.keys();
// All files are being deleted.
flushed_map_.Purge();
// Notify |delegate_| that the flushed files have been deleted.
for (const auto& key : keys) {
NotifyOnDeleted(key, reason);
}
}
void StorageManagerImpl::DropFlushedUntilUnderQuota() {
int64_t delta = flushed_map_.resource_info().used_size_bytes -
flushed_map_.resource_info().max_size_bytes;
// Sanity check if flushed events need to be dropped.
if (delta <= 0) {
return;
}
LogDiskQuotaExceededDelta(delta / 1024);
// The keys will be in the order of creation_time. Since we
// are dropping by the oldest first, we can start from the
// front of the keys list.
std::vector<FlushedKey> dropped;
for (const auto& key : flushed_map_.keys()) {
if (delta <= 0) {
break;
}
dropped.push_back(key);
delta -= key.size;
}
LogDeletedBuffersWhenOverQuota(dropped.size());
// Deletes the keys asynchronously.
flushed_map_.DeleteKeys(dropped);
for (const auto& key : dropped) {
NotifyOnDeleted(key, DeleteReason::kExceededQuota);
}
dropping_flushed_queued_.store(false);
}
} // namespace metrics::structured