chromium/chromecast/media/audio/mixer_service/receiver/receiver_cma.cc

// Copyright 2019 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "chromecast/media/audio/mixer_service/receiver/receiver_cma.h"

#include <limits>
#include <utility>

#include "base/location.h"
#include "base/logging.h"
#include "base/memory/weak_ptr.h"
#include "base/task/sequenced_task_runner.h"
#include "base/time/time.h"
#include "base/timer/timer.h"
#include "chromecast/media/audio/mixer_service/mixer_socket.h"
#include "chromecast/media/audio/mixer_service/receiver/cma_backend_shim.h"

namespace chromecast {
namespace media {
namespace mixer_service {

namespace {

constexpr base::TimeDelta kInactivityTimeout = base::Seconds(5);

enum MessageTypes : int {
  kPushResult = 1,
  kEndOfStream,
};

}  // namespace

class ReceiverCma::UnusedSocket : public MixerSocket::Delegate {
 public:
  UnusedSocket(ReceiverCma* receiver, std::unique_ptr<MixerSocket> socket)
      : receiver_(receiver), socket_(std::move(socket)) {
    DCHECK(receiver_);
    DCHECK(socket_);
    socket_->SetDelegate(this);
  }

  UnusedSocket(const UnusedSocket&) = delete;
  UnusedSocket& operator=(const UnusedSocket&) = delete;

  ~UnusedSocket() override = default;

 private:
  // MixerSocket::Delegate implementation:
  void OnConnectionError() override { receiver_->RemoveUnusedSocket(this); }

  ReceiverCma* const receiver_;
  const std::unique_ptr<MixerSocket> socket_;
};

class ReceiverCma::Stream : public MixerSocket::Delegate,
                            public CmaBackendShim::Delegate {
 public:
  Stream(ReceiverCma* receiver, std::unique_ptr<MixerSocket> socket)
      : receiver_(receiver), socket_(std::move(socket)), weak_factory_(this) {
    DCHECK(receiver_);
    DCHECK(socket_);

    socket_->SetDelegate(this);

    inactivity_timer_.Start(FROM_HERE, kInactivityTimeout, this,
                            &Stream::OnInactivityTimeout);
  }

  Stream(const Stream&) = delete;
  Stream& operator=(const Stream&) = delete;

  ~Stream() override = default;

  // MixerSocket::Delegate implementation:
  bool HandleMetadata(const mixer_service::Generic& message) override {
    last_receive_time_ = base::TimeTicks::Now();
    inactivity_timer_.Reset();

    if (message.has_output_stream_params()) {
      if (cma_audio_) {
        LOG(INFO) << "Received stream metadata after stream was already set up";
        return true;
      }

      pushed_eos_ = false;
      cma_audio_.reset(new mixer_service::CmaBackendShim(
          weak_factory_.GetWeakPtr(),
          base::SequencedTaskRunner::GetCurrentDefault(),
          message.output_stream_params(), receiver_->backend_manager()));
    }

    if (message.has_set_stream_volume()) {
      if (!cma_audio_) {
        LOG(INFO) << "Can't set volume before stream is set up";
        return true;
      }
      cma_audio_->SetVolumeMultiplier(message.set_stream_volume().volume());
    }

    if (message.has_eos_played_out()) {
      // Explicit EOS.
      return HandleAudioData(nullptr, 0, INT64_MIN);
    }

    return true;
  }

  bool HandleAudioData(char* data, size_t size, int64_t timestamp) override {
    last_receive_time_ = base::TimeTicks::Now();
    inactivity_timer_.Reset();

    if (!cma_audio_) {
      LOG(INFO) << "Received audio before stream metadata; ignoring";
      return true;
    }

    if (size == 0) {
      pushed_eos_ = true;
    }
    cma_audio_->AddData(data, size);
    return false;  // Don't receive any more messages until the buffer is
                   // pushed.
  }

  void OnConnectionError() override {
    LOG(INFO) << "Connection lost for " << this;
    receiver_->RemoveStream(this);
  }

 private:
  void OnInactivityTimeout() {
    LOG(INFO) << "Timed out " << this
              << " due to inactivity; now = " << base::TimeTicks::Now()
              << ", last send = " << last_send_time_
              << ", last receive = " << last_receive_time_;
    receiver_->RemoveStream(this);
  }

  // CmaBackendShim::Delegate implementation:
  void OnBufferPushed(CmaBackendShim::RenderingDelay rendering_delay) override {
    if (!pushed_eos_) {
      mixer_service::BufferPushResult message;
      message.set_delay_timestamp(rendering_delay.timestamp_microseconds);
      message.set_delay(rendering_delay.delay_microseconds);
      mixer_service::Generic generic;
      *(generic.mutable_push_result()) = message;
      socket_->SendProto(kPushResult, generic);
      last_send_time_ = base::TimeTicks::Now();
    }

    socket_->ReceiveMoreMessages();
  }

  void PlayedEos() override {
    LOG(INFO) << "EOS played for " << this;
    mixer_service::EosPlayedOut message;
    mixer_service::Generic generic;
    *generic.mutable_eos_played_out() = message;
    socket_->SendProto(kEndOfStream, generic);
    last_send_time_ = base::TimeTicks::Now();

    cma_audio_.reset();
  }

  void OnAudioPlaybackError() override {
    LOG(INFO) << "Audio playback error for " << this;
    receiver_->RemoveStream(this);
  }

  ReceiverCma* const receiver_;
  const std::unique_ptr<MixerSocket> socket_;

  base::OneShotTimer inactivity_timer_;

  std::unique_ptr<mixer_service::CmaBackendShim,
                  mixer_service::CmaBackendShim::Deleter>
      cma_audio_;
  bool pushed_eos_ = false;

  base::TimeTicks last_send_time_;
  base::TimeTicks last_receive_time_;

  base::WeakPtrFactory<Stream> weak_factory_;
};

ReceiverCma::ReceiverCma(MediaPipelineBackendManager* backend_manager)
    : backend_manager_(backend_manager) {
  DCHECK(backend_manager_);
}

ReceiverCma::~ReceiverCma() = default;

void ReceiverCma::CreateOutputStream(std::unique_ptr<MixerSocket> socket,
                                     const Generic& message) {
  auto stream = std::make_unique<Stream>(this, std::move(socket));
  Stream* ptr = stream.get();
  streams_[ptr] = std::move(stream);
  ptr->HandleMetadata(message);
}

void ReceiverCma::CreateLoopbackConnection(std::unique_ptr<MixerSocket> socket,
                                           const Generic& message) {
  LOG(INFO) << "Unhandled loopback connection";
  AddUnusedSocket(std::move(socket));
}

void ReceiverCma::CreateAudioRedirection(std::unique_ptr<MixerSocket> socket,
                                         const Generic& message) {
  LOG(INFO) << "Unhandled redirection connection";
  AddUnusedSocket(std::move(socket));
}

void ReceiverCma::CreateControlConnection(std::unique_ptr<MixerSocket> socket,
                                          const Generic& message) {
  LOG(INFO) << "Unhandled control connection";
  AddUnusedSocket(std::move(socket));
}

void ReceiverCma::RemoveStream(Stream* stream) {
  streams_.erase(stream);
}

void ReceiverCma::AddUnusedSocket(std::unique_ptr<MixerSocket> socket) {
  auto unused_socket = std::make_unique<UnusedSocket>(this, std::move(socket));
  UnusedSocket* ptr = unused_socket.get();
  unused_sockets_[ptr] = std::move(unused_socket);
}

void ReceiverCma::RemoveUnusedSocket(UnusedSocket* unused_socket) {
  unused_sockets_.erase(unused_socket);
}

}  // namespace mixer_service
}  // namespace media
}  // namespace chromecast