chromium/chromecast/media/audio/audio_output_service/receiver/audio_output_service_receiver.cc

// Copyright 2021 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "chromecast/media/audio/audio_output_service/receiver/audio_output_service_receiver.h"

#include <utility>

#include "base/check.h"
#include "base/logging.h"
#include "base/memory/weak_ptr.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/single_thread_task_runner.h"
#include "base/time/time.h"
#include "base/timer/timer.h"
#include "chromecast/media/api/cma_backend_factory.h"
#include "chromecast/media/audio/audio_output_service/constants.h"
#include "chromecast/media/audio/audio_output_service/output_socket.h"
#include "chromecast/media/audio/audio_output_service/receiver/cma_backend_shim.h"

namespace chromecast {
namespace media {
namespace audio_output_service {

namespace {

constexpr base::TimeDelta kInactivityTimeout = base::Seconds(5);

enum MessageTypes : int {
  kUpdateMediaTime = 1,
  kBackendInitialized = 2,
};

}  // namespace

class AudioOutputServiceReceiver::Stream
    : public audio_output_service::OutputSocket::Delegate,
      public audio_output_service::CmaBackendShim::Delegate {
 public:
  Stream(AudioOutputServiceReceiver* receiver,
         std::unique_ptr<OutputSocket> socket)
      : receiver_(receiver), socket_(std::move(socket)) {
    DCHECK(receiver_);
    DCHECK(socket_);

    socket_->SetDelegate(this);

    inactivity_timer_.Start(FROM_HERE, kInactivityTimeout, this,
                            &Stream::OnInactivityTimeout);
  }
  Stream(const Stream&) = delete;
  Stream& operator=(const Stream&) = delete;
  ~Stream() override = default;

  // OutputSocket::Delegate implementation:
  bool HandleMetadata(const Generic& message) override {
    last_receive_time_ = base::TimeTicks::Now();
    inactivity_timer_.Reset();

    if (message.has_heartbeat()) {
      return true;
    }

    if (message.has_backend_params()) {
      if (cma_audio_) {
        LOG(INFO) << "Received stream metadata after initialization, there must"
                  << " be an update to the audio config.";
        cma_audio_->UpdateAudioConfig(message.backend_params());
        return true;
      }

      pushed_eos_ = false;
      cma_audio_.reset(new audio_output_service::CmaBackendShim(
          weak_factory_.GetWeakPtr(),
          base::SequencedTaskRunner::GetCurrentDefault(),
          receiver_->media_task_runner(), message.backend_params(),
          receiver_->cma_backend_factory()));
    }

    if (message.has_set_start_timestamp()) {
      if (!cma_audio_) {
        LOG(INFO) << "Can't start before stream is set up.";
        ResetConnection();
        return false;
      }
      cma_audio_->StartPlayingFrom(message.set_start_timestamp().start_pts());
    }

    if (message.has_stop_playback()) {
      if (!cma_audio_) {
        LOG(INFO) << "Can't stop before stream is set up.";
        ResetConnection();
        return false;
      }
      cma_audio_->Stop();
    }

    if (message.has_set_stream_volume()) {
      if (!cma_audio_) {
        LOG(INFO) << "Can't set volume before stream is set up.";
        ResetConnection();
        return false;
      }
      cma_audio_->SetVolumeMultiplier(message.set_stream_volume().volume());
    }

    if (message.has_set_playback_rate()) {
      if (!cma_audio_) {
        LOG(INFO) << "Can't set playback rate before stream is set up.";
        ResetConnection();
        return false;
      }
      cma_audio_->SetPlaybackRate(message.set_playback_rate().playback_rate());
    }

    if (message.has_eos_played_out()) {
      // Explicit EOS.
      return HandleAudioData(nullptr, 0, INT64_MIN);
    }

    return true;
  }

  bool HandleAudioData(char* data, size_t size, int64_t timestamp) override {
    last_receive_time_ = base::TimeTicks::Now();
    inactivity_timer_.Reset();

    if (!cma_audio_) {
      LOG(INFO) << "Received audio before stream metadata; ignoring";
      ResetConnection();
      return false;
    }

    if (size == 0) {
      pushed_eos_ = true;
    }
    cma_audio_->AddData(data, size, timestamp);
    stopped_receiving_ = true;
    return false;  // Don't receive any more messages until the buffer is
                   // pushed.
  }

  void OnConnectionError() override {
    LOG(INFO) << "Connection lost for " << this;
    receiver_->RemoveStream(this);
  }

 private:
  void OnInactivityTimeout() {
    LOG(INFO) << "Timed out " << this
              << " due to inactivity; now = " << base::TimeTicks::Now()
              << ", last send = " << last_send_time_
              << ", last receive = " << last_receive_time_;
    receiver_->RemoveStream(this);
  }

  // CmaBackendShim::Delegate implementation:
  void OnBackendInitialized(bool success) override {
    audio_output_service::Generic generic;
    generic.mutable_backend_initialization_status()->set_status(
        success ? audio_output_service::BackendInitializationStatus::SUCCESS
                : audio_output_service::BackendInitializationStatus::ERROR);
    socket_->SendProto(kBackendInitialized, generic);
    last_send_time_ = base::TimeTicks::Now();
  }

  void OnBufferPushed() override {
    if (!stopped_receiving_) {
      return;
    }
    stopped_receiving_ = false;
    socket_->ReceiveMoreMessages();
  }

  void UpdateMediaTimeAndRenderingDelay(
      int64_t media_timestamp_microseconds,
      int64_t reference_timestamp_microseconds,
      int64_t delay_microseconds,
      int64_t delay_timestamp_microseconds) override {
    audio_output_service::CurrentMediaTimestamp message;
    message.set_media_timestamp_microseconds(media_timestamp_microseconds);
    message.set_reference_timestamp_microseconds(
        reference_timestamp_microseconds);
    message.set_delay_microseconds(delay_microseconds);
    message.set_delay_timestamp_microseconds(delay_timestamp_microseconds);
    audio_output_service::Generic generic;
    *(generic.mutable_current_media_timestamp()) = message;
    socket_->SendProto(kUpdateMediaTime, generic);
    last_send_time_ = base::TimeTicks::Now();
  }

  void OnAudioPlaybackError() override {
    LOG(INFO) << "Audio playback error for " << this;
    receiver_->RemoveStream(this);
  }

  void ResetConnection() {
    // Reset will cause the deletion of |this|, so it is better to post a task.
    base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
        FROM_HERE, base::BindOnce(&Stream::OnAudioPlaybackError,
                                  weak_factory_.GetWeakPtr()));
  }

  AudioOutputServiceReceiver* const receiver_;
  const std::unique_ptr<OutputSocket> socket_;

  base::OneShotTimer inactivity_timer_;

  std::unique_ptr<audio_output_service::CmaBackendShim,
                  audio_output_service::CmaBackendShim::Deleter>
      cma_audio_;
  bool pushed_eos_ = false;
  bool stopped_receiving_ = false;

  base::TimeTicks last_send_time_;
  base::TimeTicks last_receive_time_;

  base::WeakPtrFactory<Stream> weak_factory_{this};
};

AudioOutputServiceReceiver::AudioOutputServiceReceiver(
    CmaBackendFactory* cma_backend_factory,
    scoped_refptr<base::SingleThreadTaskRunner> media_task_runner)
    : Receiver(
          audio_output_service::kDefaultAudioOutputServiceUnixDomainSocketPath,
          audio_output_service::kDefaultAudioOutputServiceTcpPort),
      cma_backend_factory_(cma_backend_factory),
      media_task_runner_(std::move(media_task_runner)) {
  DCHECK(cma_backend_factory_);
  DCHECK(media_task_runner_);
}

AudioOutputServiceReceiver::~AudioOutputServiceReceiver() = default;

void AudioOutputServiceReceiver::CreateOutputStream(
    std::unique_ptr<OutputSocket> socket,
    const Generic& message) {
  auto stream = std::make_unique<Stream>(this, std::move(socket));
  Stream* ptr = stream.get();
  streams_[ptr] = std::move(stream);
  ptr->HandleMetadata(message);
}

void AudioOutputServiceReceiver::RemoveStream(Stream* stream) {
  streams_.erase(stream);
}

}  // namespace audio_output_service
}  // namespace media
}  // namespace chromecast