chromium/components/fuchsia_legacymetrics/legacymetrics_client.cc

// Copyright 2020 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "components/fuchsia_legacymetrics/legacymetrics_client.h"

#include <lib/fit/function.h>
#include <lib/sys/cpp/component_context.h>
#include <zircon/errors.h>

#include <algorithm>
#include <memory>
#include <utility>
#include <vector>

#include "base/fuchsia/fuchsia_logging.h"
#include "base/fuchsia/process_context.h"
#include "base/functional/callback_helpers.h"
#include "base/logging.h"
#include "base/time/time.h"
#include "components/fuchsia_legacymetrics/legacymetrics_histogram_flattener.h"

namespace fuchsia_legacymetrics {

constexpr size_t LegacyMetricsClient::kMaxBatchSize;

constexpr base::TimeDelta LegacyMetricsClient::kInitialReconnectDelay;
constexpr base::TimeDelta LegacyMetricsClient::kMaxReconnectDelay;
constexpr size_t LegacyMetricsClient::kReconnectBackoffFactor;

LegacyMetricsClient::LegacyMetricsClient() = default;

LegacyMetricsClient::~LegacyMetricsClient() {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
}

void LegacyMetricsClient::DisableAutoConnect() {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  DCHECK(auto_connect_);
  DCHECK_EQ(report_interval_, base::TimeDelta())
      << "DisableAutoConnect() must be called before Start().";

  auto_connect_ = false;
}

void LegacyMetricsClient::SetMetricsRecorder(
    fidl::InterfaceHandle<fuchsia::legacymetrics::MetricsRecorder>
        metrics_recorder) {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  DCHECK(!auto_connect_);

  auto weak_this = weak_factory_.GetWeakPtr();
  ResetMetricsRecorderState();

  // ResetMetricsRecorderState() may call |on_flush_complete_closures_|, which
  // may destroy LegacyMetricsClient.
  if (!weak_this)
    return;

  SetMetricsRecorderInternal(std::move(metrics_recorder));

  if (report_interval_.is_positive())
    ScheduleNextReport();
}

void LegacyMetricsClient::Start(base::TimeDelta report_interval) {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  DCHECK_GT(report_interval, base::Seconds(0));

  // Start recording user events.
  user_events_recorder_ = std::make_unique<LegacyMetricsUserActionRecorder>();

  report_interval_ = report_interval;

  if (auto_connect_)
    ConnectFromComponentContext();

  if (metrics_recorder_)
    ScheduleNextReport();
}

void LegacyMetricsClient::SetReportAdditionalMetricsCallback(
    ReportAdditionalMetricsCallback callback) {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  DCHECK(!metrics_recorder_)
      << "SetReportAdditionalMetricsCallback() must be called before Start().";
  DCHECK(!report_additional_callback_);
  DCHECK(callback);

  report_additional_callback_ = std::move(callback);
}

void LegacyMetricsClient::SetNotifyFlushCallback(NotifyFlushCallback callback) {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  DCHECK(callback);
  DCHECK(!metrics_recorder_)
      << "SetNotifyFlushCallback() must be called before Start().";

  notify_flush_callback_ = std::move(callback);
}

void LegacyMetricsClient::ConnectFromComponentContext() {
  DCHECK(!metrics_recorder_) << "Trying to connect when already connected.";
  DVLOG(1) << "Trying to connect to MetricsRecorder service.";
  DCHECK(auto_connect_);

  fidl::InterfaceHandle<fuchsia::legacymetrics::MetricsRecorder>
      metrics_recorder;
  base::ComponentContextForProcess()->svc()->Connect(
      metrics_recorder.NewRequest());
  SetMetricsRecorderInternal(std::move(metrics_recorder));

  ScheduleNextReport();
}

void LegacyMetricsClient::SetMetricsRecorderInternal(
    fidl::InterfaceHandle<fuchsia::legacymetrics::MetricsRecorder>
        metrics_recorder) {
  metrics_recorder_.Bind(std::move(metrics_recorder));
  metrics_recorder_.set_error_handler(fit::bind_member(
      this, &LegacyMetricsClient::OnMetricsRecorderDisconnected));
  metrics_recorder_.events().OnCloseSoon =
      fit::bind_member(this, &LegacyMetricsClient::OnCloseSoon);
}

void LegacyMetricsClient::ScheduleNextReport() {
  DCHECK(!is_flushing_);

  if (report_timer_.IsRunning())
    return;

  DVLOG(1) << "Scheduling next report in " << report_interval_.InSeconds()
           << "seconds.";
  report_timer_.Start(FROM_HERE, report_interval_, this,
                      &LegacyMetricsClient::StartReport);
}

void LegacyMetricsClient::StartReport() {
  if (!report_additional_callback_) {
    Report({});
    return;
  }
  report_additional_callback_.Run(
      base::BindOnce(&LegacyMetricsClient::Report, weak_factory_.GetWeakPtr()));
}

void LegacyMetricsClient::Report(
    std::vector<fuchsia::legacymetrics::Event> events) {
  DVLOG(1) << __func__ << " called.";

  // The connection might have dropped while additional metrics were being
  // collected. Continue recording events and cache them locally in memory until
  // connection is reestablished.
  if (!metrics_recorder_)
    return;

  // Include histograms.
  for (auto& histogram : GetLegacyMetricsDeltas()) {
    fuchsia::legacymetrics::Event histogram_event;
    histogram_event.set_histogram(std::move(histogram));
    events.push_back(std::move(histogram_event));
  }

  // Include user events.
  if (user_events_recorder_->HasEvents()) {
    for (auto& event : user_events_recorder_->TakeEvents()) {
      fuchsia::legacymetrics::Event user_event;
      user_event.set_user_action_event(std::move(event));
      events.push_back(std::move(user_event));
    }
  }

  std::move(events.begin(), events.end(), std::back_inserter(to_send_));

  DrainBuffer();
}

void LegacyMetricsClient::DrainBuffer() {
  DVLOG(1) << __func__ << " called.";

  if (record_ack_pending_) {
    // There is a Record() call already inflight. When it is acknowledged,
    // buffer draining will continue.
    return;
  }

  if (to_send_.empty()) {
    DVLOG(1) << "Buffer drained.";

    if (is_flushing_) {
      metrics_recorder_.Unbind();
      CompleteFlush();
      return;
    }

    ScheduleNextReport();
    return;
  }

  // Since ordering doesn't matter, we can efficiently drain |to_send_| by
  // repeatedly sending and truncating its tail.
  const size_t batch_size = std::min(to_send_.size(), kMaxBatchSize);
  const size_t batch_start_idx = to_send_.size() - batch_size;
  std::vector<fuchsia::legacymetrics::Event> batch;
  batch.resize(batch_size);
  std::move(to_send_.begin() + batch_start_idx, to_send_.end(), batch.begin());
  to_send_.resize(to_send_.size() - batch_size);

  record_ack_pending_ = true;
  metrics_recorder_->Record(std::move(batch), [this]() {
    record_ack_pending_ = false;

    // Reset the reconnect delay after a successful Record() call.
    reconnect_delay_ = kInitialReconnectDelay;

    DrainBuffer();
  });
}

void LegacyMetricsClient::OnMetricsRecorderDisconnected(zx_status_t status) {
  ZX_LOG(ERROR, status) << "MetricsRecorder connection lost.";

  if (auto_connect_ && status == ZX_ERR_PEER_CLOSED) {
    DVLOG(1) << "Scheduling reconnect after " << reconnect_delay_;

    // Try to reconnect with exponential backoff.
    reconnect_timer_.Start(FROM_HERE, reconnect_delay_, this,
                           &LegacyMetricsClient::ReconnectMetricsRecorder);

    // Increase delay exponentially. No random jittering since we don't expect
    // many clients overloading the service with simultaneous reconnections.
    reconnect_delay_ = std::min(reconnect_delay_ * kReconnectBackoffFactor,
                                kMaxReconnectDelay);
  }

  ResetMetricsRecorderState();
}

void LegacyMetricsClient::ReconnectMetricsRecorder() {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  DVLOG(1) << __func__ << " called.";

  ConnectFromComponentContext();
  ScheduleNextReport();
}

void LegacyMetricsClient::FlushAndDisconnect(
    base::OnceClosure on_flush_complete) {
  DVLOG(1) << __func__ << " called.";

  if (on_flush_complete)
    on_flush_complete_closures_.push_back(std::move(on_flush_complete));

  if (is_flushing_)
    return;

  report_timer_.AbandonAndStop();

  is_flushing_ = true;
  if (notify_flush_callback_) {
    // Defer reporting until the flush operation has finished.
    std::move(notify_flush_callback_)
        .Run(base::BindOnce(&LegacyMetricsClient::StartReport,
                            weak_factory_.GetWeakPtr()));
  } else {
    StartReport();
  }
}

void LegacyMetricsClient::OnCloseSoon() {
  FlushAndDisconnect(base::OnceClosure());
}

void LegacyMetricsClient::CompleteFlush() {
  DCHECK(is_flushing_);

  is_flushing_ = false;

  // One of the callbacks may destroy |this|, so move them all to the stack
  // first.
  std::vector<base::OnceClosure> on_flush_complete_closures;
  on_flush_complete_closures.swap(on_flush_complete_closures_);
  for (auto& closure : on_flush_complete_closures) {
    std::move(closure).Run();
  }
}

void LegacyMetricsClient::ResetMetricsRecorderState() {
  // Stop reporting metric events.
  report_timer_.AbandonAndStop();

  record_ack_pending_ = false;

  if (is_flushing_)
    CompleteFlush();
}

}  // namespace fuchsia_legacymetrics