chromium/chromecast/media/cma/backend/audio_decoder_for_mixer.cc

// Copyright 2015 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "chromecast/media/cma/backend/audio_decoder_for_mixer.h"

#include <algorithm>
#include <limits>

#include "base/functional/callback_helpers.h"
#include "base/logging.h"
#include "base/time/time.h"
#include "base/trace_event/trace_event.h"
#include "build/build_config.h"
#include "chromecast/base/chromecast_switches.h"
#include "chromecast/base/task_runner_impl.h"
#include "chromecast/media/api/decoder_buffer_base.h"
#include "chromecast/media/audio/mixer_service/mixer_service_transport.pb.h"
#include "chromecast/media/audio/mixer_service/mixer_socket.h"
#include "chromecast/media/audio/net/conversions.h"
#include "chromecast/media/base/default_monotonic_clock.h"
#include "chromecast/media/cma/backend/media_pipeline_backend_for_mixer.h"
#include "chromecast/media/cma/base/decoder_buffer_adapter.h"
#include "chromecast/media/cma/base/decoder_config_adapter.h"
#include "chromecast/net/io_buffer_pool.h"
#include "chromecast/public/media/cast_decoder_buffer.h"
#include "media/base/audio_bus.h"
#include "media/base/audio_timestamp_helper.h"
#include "media/base/channel_layout.h"
#include "media/base/decoder_buffer.h"
#include "media/base/sample_format.h"
#include "media/filters/audio_renderer_algorithm.h"

#define TRACE_FUNCTION_ENTRY0() TRACE_EVENT0("cma", __FUNCTION__)

#define TRACE_FUNCTION_ENTRY1(arg1) \
  TRACE_EVENT1("cma", __FUNCTION__, #arg1, arg1)

#define TRACE_FUNCTION_ENTRY2(arg1, arg2) \
  TRACE_EVENT2("cma", __FUNCTION__, #arg1, arg1, #arg2, arg2)

namespace chromecast {
namespace media {

namespace {

const int kInitialFillSizeFrames = 512;
const double kPlaybackRateEpsilon = 0.001;

constexpr double kMinAudioClockRate = 0.99;
constexpr double kMaxAudioClockRate = 1.01;

const int64_t kDefaultInputQueueMs = 200;
constexpr base::TimeDelta kFadeTime = base::Milliseconds(5);
const int kDefaultStartThresholdMs = 70;

const CastAudioDecoder::OutputFormat kDecoderSampleFormat =
    CastAudioDecoder::kOutputPlanarFloat;

const int64_t kInvalidTimestamp = std::numeric_limits<int64_t>::min();

const int64_t kNoPendingOutput = -1;

constexpr int kAudioMessageHeaderSize =
    mixer_service::MixerSocket::kAudioMessageHeaderSize;

// TODO(jameswest): Replace numeric playout channel with AudioChannel enum in
// mixer.
int ToPlayoutChannel(AudioChannel audio_channel) {
  switch (audio_channel) {
    case AudioChannel::kAll:
      return kChannelAll;
    case AudioChannel::kLeft:
      return 0;
    case AudioChannel::kRight:
      return 1;
  }
  NOTREACHED_IN_MIGRATION();
  return kChannelAll;
}

int MaxQueuedFrames(int sample_rate) {
  static int64_t queue_ms = GetSwitchValueNonNegativeInt(
      switches::kMixerSourceInputQueueMs, kDefaultInputQueueMs);

  return ::media::AudioTimestampHelper::TimeToFrames(
      base::Milliseconds(queue_ms), sample_rate);
}

int StartThreshold(int sample_rate) {
  static int64_t start_threshold_ms = GetSwitchValueNonNegativeInt(
      switches::kMixerSourceAudioReadyThresholdMs, kDefaultStartThresholdMs);

  return ::media::AudioTimestampHelper::TimeToFrames(
      base::Milliseconds(start_threshold_ms), sample_rate);
}

}  // namespace

// static
bool MediaPipelineBackend::AudioDecoder::RequiresDecryption() {
  return true;
}

AudioDecoderForMixer::AudioDecoderForMixer(
    MediaPipelineBackendForMixer* backend)
    : backend_(backend),
      task_runner_(backend->GetTaskRunner()),
      buffer_pool_frames_(kInitialFillSizeFrames),
      pending_output_frames_(kNoPendingOutput),
      pool_(new ::media::AudioBufferMemoryPool()),
      weak_factory_(this) {
  TRACE_FUNCTION_ENTRY0();
  DCHECK(backend_);
  DCHECK(task_runner_.get());
  DCHECK(task_runner_->BelongsToCurrentThread());
}

AudioDecoderForMixer::~AudioDecoderForMixer() {
  TRACE_FUNCTION_ENTRY0();
  DCHECK(task_runner_->BelongsToCurrentThread());
}

void AudioDecoderForMixer::SetDelegate(
    MediaPipelineBackend::Decoder::Delegate* delegate) {
  DCHECK(delegate);
  delegate_ = delegate;
}

void AudioDecoderForMixer::Initialize() {
  TRACE_FUNCTION_ENTRY0();
  DCHECK(delegate_);
  stats_ = Statistics();
  pending_buffer_complete_ = false;
  pending_output_frames_ = kNoPendingOutput;
  paused_ = false;
  playback_rate_ = 1.0f;
  reported_ready_for_playback_ = false;
  mixer_delay_ = RenderingDelay();

  next_buffer_delay_ = RenderingDelay();
  last_push_pts_ = kInvalidTimestamp;
  last_push_playout_timestamp_ = kInvalidTimestamp;
}

bool AudioDecoderForMixer::Start(int64_t playback_start_pts,
                                 bool av_sync_enabled) {
  TRACE_FUNCTION_ENTRY0();
  DCHECK(IsValidConfig(input_config_));

  // Create decoder_ if necessary. This can happen if Stop() was called, and
  // SetConfig() was not called since then.
  if (!decoder_) {
    CreateDecoder();
  }
  decoded_config_ = (decoder_ ? decoder_->GetOutputConfig() : input_config_);
  CreateMixerInput(decoded_config_, av_sync_enabled);
  playback_start_pts_ = playback_start_pts;
  av_sync_enabled_ = av_sync_enabled;

  return true;
}

void AudioDecoderForMixer::CreateBufferPool(const AudioConfig& config,
                                            int frame_count) {
  DCHECK_GT(frame_count, 0);
  buffer_pool_frames_ = frame_count;
  buffer_pool_ = base::MakeRefCounted<IOBufferPool>(
      frame_count * sizeof(float) * config.channel_number +
          kAudioMessageHeaderSize,
      std::numeric_limits<size_t>::max(), true /* threadsafe */);
  buffer_pool_->Preallocate(1);
}

void AudioDecoderForMixer::CreateMixerInput(const AudioConfig& config,
                                            bool av_sync_enabled) {
  CreateBufferPool(config, buffer_pool_frames_);
  DCHECK_GT(buffer_pool_frames_, 0);

  mixer_service::OutputStreamParams params;
  params.set_stream_type(
      backend_->Primary()
          ? mixer_service::OutputStreamParams::STREAM_TYPE_DEFAULT
          : mixer_service::OutputStreamParams::STREAM_TYPE_SFX);
  params.set_content_type(
      audio_service::ConvertContentType(backend_->ContentType()));
  params.set_sample_format(audio_service::SAMPLE_FORMAT_FLOAT_P);
  params.set_device_id(backend_->DeviceId());
  params.set_sample_rate(config.samples_per_second);
  params.set_num_channels(config.channel_number);
  params.set_channel_selection(ToPlayoutChannel(backend_->AudioChannel()));
  params.set_fill_size_frames(buffer_pool_frames_);
  params.set_start_threshold_frames(StartThreshold(config.samples_per_second));
  params.set_max_buffered_frames(MaxQueuedFrames(config.samples_per_second));
  params.set_fade_frames(::media::AudioTimestampHelper::TimeToFrames(
      kFadeTime, config.samples_per_second));
  params.set_use_start_timestamp(av_sync_enabled);
  params.set_enable_audio_clock_simulation(av_sync_enabled);

  mixer_input_ =
      std::make_unique<mixer_service::OutputStreamConnection>(this, params);
  mixer_input_->SetVolumeMultiplier(volume_multiplier_);
  mixer_input_->SetAudioClockRate(av_sync_clock_rate_);
  mixer_input_->SetPlaybackRate(playback_rate_);
  mixer_input_->Connect();
}

void AudioDecoderForMixer::StartPlaybackAt(int64_t playback_start_timestamp) {
  LOG(INFO) << __func__
            << " playback_start_timestamp_=" << playback_start_timestamp;
  mixer_input_->SetStartTimestamp(playback_start_timestamp,
                                  playback_start_pts_);
}

void AudioDecoderForMixer::RestartPlaybackAt(int64_t pts, int64_t timestamp) {
  LOG(INFO) << __func__ << " pts=" << pts << " timestamp=" << timestamp;

  next_buffer_delay_ = RenderingDelay();
  last_push_playout_timestamp_ = kInvalidTimestamp;
  mixer_input_->SetStartTimestamp(timestamp, pts);
}

AudioDecoderForMixer::RenderingDelay
AudioDecoderForMixer::GetMixerRenderingDelay() {
  return mixer_delay_;
}

void AudioDecoderForMixer::Stop() {
  TRACE_FUNCTION_ENTRY0();
  decoder_.reset();
  mixer_input_.reset();
  weak_factory_.InvalidateWeakPtrs();

  Initialize();
}

bool AudioDecoderForMixer::Pause() {
  TRACE_FUNCTION_ENTRY0();
  DCHECK(mixer_input_);
  paused_ = true;
  mixer_input_->Pause();
  return true;
}

bool AudioDecoderForMixer::Resume() {
  TRACE_FUNCTION_ENTRY0();
  DCHECK(mixer_input_);
  mixer_input_->Resume();
  next_buffer_delay_ = RenderingDelay();
  last_push_playout_timestamp_ = kInvalidTimestamp;
  paused_ = false;
  return true;
}

float AudioDecoderForMixer::SetPlaybackRate(float rate) {
  if (std::abs(rate - 1.0) < kPlaybackRateEpsilon) {
    // AudioRendererAlgorithm treats values close to 1 as exactly 1.
    rate = 1.0f;
  }
  LOG(INFO) << "SetPlaybackRate to " << rate;
  if (rate <= 0) {
    LOG(ERROR) << "Invalid playback rate " << rate;
    rate = 1.0f;
  }

  playback_rate_ = rate;
  mixer_input_->SetPlaybackRate(rate);
  backend_->NewAudioPlaybackRateInEffect(rate);

  return rate;
}

double AudioDecoderForMixer::SetAvSyncPlaybackRate(double rate) {
  av_sync_clock_rate_ =
      std::max(kMinAudioClockRate, std::min(rate, kMaxAudioClockRate));
  if (mixer_input_)
    mixer_input_->SetAudioClockRate(av_sync_clock_rate_);
  return av_sync_clock_rate_;
}

bool AudioDecoderForMixer::GetTimestampedPts(int64_t* timestamp,
                                             int64_t* pts) const {
  if (paused_ || last_push_playout_timestamp_ == kInvalidTimestamp ||
      last_push_pts_ == kInvalidTimestamp) {
    return false;
  }

  // Note: timestamp may be slightly in the future.
  *timestamp = last_push_playout_timestamp_;
  *pts = last_push_pts_;
  return true;
}

int64_t AudioDecoderForMixer::GetCurrentPts() const {
  return last_push_pts_;
}

AudioDecoderForMixer::BufferStatus AudioDecoderForMixer::PushBuffer(
    CastDecoderBuffer* buffer) {
  TRACE_FUNCTION_ENTRY0();
  DCHECK(task_runner_->BelongsToCurrentThread());
  DCHECK(buffer);
  DCHECK(!mixer_error_);
  DCHECK(!pending_buffer_complete_);
  DCHECK(mixer_input_);

  uint64_t input_bytes = buffer->end_of_stream() ? 0 : buffer->data_size();
  scoped_refptr<DecoderBufferBase> buffer_base(
      static_cast<DecoderBufferBase*>(buffer));

  // If the buffer is already decoded, do not attempt to decode. Call
  // OnBufferDecoded asynchronously on the main thread.
  if (BypassDecoder()) {
    DCHECK(!decoder_);
    task_runner_->PostTask(
        FROM_HERE, base::BindOnce(&AudioDecoderForMixer::OnBufferDecoded,
                                  weak_factory_.GetWeakPtr(), input_bytes,
                                  false /* has_config */,
                                  CastAudioDecoder::Status::kDecodeOk,
                                  AudioConfig(), buffer_base));
    return MediaPipelineBackend::kBufferPending;
  }

  if (!decoder_) {
    return MediaPipelineBackend::kBufferFailed;
  }

  // Decode the buffer.
  decoder_->Decode(std::move(buffer_base),
                   base::BindOnce(&AudioDecoderForMixer::OnBufferDecoded,
                                  base::Unretained(this), input_bytes,
                                  true /* has_config */));
  return MediaPipelineBackend::kBufferPending;
}

void AudioDecoderForMixer::UpdateStatistics(Statistics delta) {
  DCHECK(task_runner_->BelongsToCurrentThread());
  stats_.decoded_bytes += delta.decoded_bytes;
}

void AudioDecoderForMixer::GetStatistics(Statistics* stats) {
  TRACE_FUNCTION_ENTRY0();
  DCHECK(stats);
  DCHECK(task_runner_->BelongsToCurrentThread());
  *stats = stats_;
}

bool AudioDecoderForMixer::SetConfig(const AudioConfig& config) {
  TRACE_FUNCTION_ENTRY0();
  DCHECK(task_runner_->BelongsToCurrentThread());
  if (!IsValidConfig(config)) {
    LOG(ERROR) << "Invalid audio config passed to SetConfig";
    return false;
  }

  input_config_ = config;
  decoder_.reset();
  CreateDecoder();

  auto decoded_config =
      (decoder_ ? decoder_->GetOutputConfig() : input_config_);
  bool changed_config =
      (decoded_config.samples_per_second !=
           decoded_config_.samples_per_second ||
       decoded_config.channel_number != decoded_config_.channel_number);
  decoded_config_ = decoded_config;
  if (mixer_input_ && changed_config) {
    ResetMixerInputForNewConfig(decoded_config);
  }

  if (pending_buffer_complete_ && changed_config) {
    pending_buffer_complete_ = false;
    delegate_->OnPushBufferComplete(MediaPipelineBackend::kBufferSuccess);
  }
  return true;
}

void AudioDecoderForMixer::ResetMixerInputForNewConfig(
    const AudioConfig& config) {
  // Destroy the old input first to ensure that the mixer output sample rate
  // is updated.
  mixer_input_.reset();

  pending_output_frames_ = kNoPendingOutput;
  next_buffer_delay_ = AudioDecoderForMixer::RenderingDelay();
  int64_t last_timestamp = last_push_playout_timestamp_;
  last_push_playout_timestamp_ = kInvalidTimestamp;

  CreateMixerInput(config, av_sync_enabled_);
  if (av_sync_enabled_) {
    if (last_timestamp != kInvalidTimestamp &&
        last_push_pts_ != kInvalidTimestamp) {
      mixer_input_->SetStartTimestamp(last_timestamp, last_push_pts_);
    } else {
      // Pause + resume starts playback immediately.
      mixer_input_->Pause();
      mixer_input_->Resume();
    }
  }
}

void AudioDecoderForMixer::CreateDecoder() {
  DCHECK(!decoder_);
  DCHECK(IsValidConfig(input_config_));

  // No need to create a decoder if the samples are already decoded.
  if (BypassDecoder()) {
    LOG(INFO) << "Data is not coded. Decoder will not be used.";
    return;
  }

  // Create a decoder.
  decoder_ = CastAudioDecoder::Create(task_runner_, input_config_,
                                      kDecoderSampleFormat);
  if (!decoder_) {
    LOG(ERROR) << "Failed to create audio decoder";
    delegate_->OnDecoderError();
  }
}

bool AudioDecoderForMixer::SetVolume(float multiplier) {
  TRACE_FUNCTION_ENTRY1(multiplier);
  DCHECK(task_runner_->BelongsToCurrentThread());
  volume_multiplier_ = multiplier;
  if (mixer_input_)
    mixer_input_->SetVolumeMultiplier(volume_multiplier_);
  return true;
}

AudioDecoderForMixer::RenderingDelay AudioDecoderForMixer::GetRenderingDelay() {
  TRACE_FUNCTION_ENTRY0();
  if (paused_) {
    return RenderingDelay();
  }

  AudioDecoderForMixer::RenderingDelay delay = next_buffer_delay_;
  if (delay.timestamp_microseconds != INT64_MIN) {
    double usec_per_sample = 1000000.0 / decoded_config_.samples_per_second;
    double queued_output_frames = 0.0;

    // Account for data that is in the process of being pushed to the mixer.
    if (pending_output_frames_ != kNoPendingOutput) {
      queued_output_frames += pending_output_frames_;
    }
    delay.delay_microseconds += queued_output_frames * usec_per_sample;
  }

  return delay;
}

AudioDecoderForMixer::AudioTrackTimestamp
AudioDecoderForMixer::GetAudioTrackTimestamp() {
  return AudioTrackTimestamp();
}

int AudioDecoderForMixer::GetStartThresholdInFrames() {
  return 0;
}

void AudioDecoderForMixer::OnBufferDecoded(
    uint64_t input_bytes,
    bool has_config,
    CastAudioDecoder::Status status,
    const AudioConfig& config,
    scoped_refptr<DecoderBufferBase> decoded) {
  TRACE_FUNCTION_ENTRY0();
  DCHECK(task_runner_->BelongsToCurrentThread());
  DCHECK(!pending_buffer_complete_);

  if (!mixer_input_) {
    LOG(DFATAL) << "Buffer pushed before Start() or after Stop()";
    return;
  }
  if (status == CastAudioDecoder::Status::kDecodeError) {
    LOG(ERROR) << "Decode error";
    delegate_->OnPushBufferComplete(MediaPipelineBackend::kBufferFailed);
    return;
  }
  if (mixer_error_) {
    delegate_->OnPushBufferComplete(MediaPipelineBackend::kBufferFailed);
    return;
  }

  Statistics delta;
  delta.decoded_bytes = input_bytes;
  UpdateStatistics(delta);

  if (has_config) {
    bool changed_config = false;
    if (config.samples_per_second != decoded_config_.samples_per_second) {
      LOG(INFO) << "Input sample rate changed from "
                << decoded_config_.samples_per_second << " to "
                << config.samples_per_second;
      decoded_config_.samples_per_second = config.samples_per_second;
      changed_config = true;
    }
    if (config.channel_number != decoded_config_.channel_number) {
      LOG(INFO) << "Input channel count changed from "
                << decoded_config_.channel_number << " to "
                << config.channel_number;
      decoded_config_.channel_number = config.channel_number;
      changed_config = true;
    }
    if (changed_config) {
      // Config from actual stream doesn't match supposed config from the
      // container. Update the mixer.
      ResetMixerInputForNewConfig(decoded_config_);
    }
  }

  if (!decoded->end_of_stream()) {
    pending_output_frames_ =
        decoded->data_size() / (decoded_config_.channel_number * sizeof(float));
    last_push_pts_ = decoded->timestamp();
    last_push_playout_timestamp_ =
        (next_buffer_delay_.timestamp_microseconds == kInvalidTimestamp
             ? kInvalidTimestamp
             : next_buffer_delay_.timestamp_microseconds +
                   next_buffer_delay_.delay_microseconds);
  }
  WritePcm(std::move(decoded));
}

void AudioDecoderForMixer::CheckBufferComplete() {
  if (!pending_buffer_complete_) {
    return;
  }

  pending_buffer_complete_ = false;
  delegate_->OnPushBufferComplete(MediaPipelineBackend::kBufferSuccess);
}

bool AudioDecoderForMixer::BypassDecoder() const {
  DCHECK(task_runner_->BelongsToCurrentThread());
  // The mixer input requires planar float PCM data.
  return (input_config_.codec == kCodecPCM &&
          input_config_.sample_format == kSampleFormatPlanarF32);
}

void AudioDecoderForMixer::WritePcm(scoped_refptr<DecoderBufferBase> buffer) {
  DCHECK(mixer_input_);
  DCHECK(buffer_pool_);
  DCHECK(!pending_buffer_complete_);

  if (buffer->end_of_stream()) {
    pending_buffer_complete_ = true;
    mixer_input_->SendAudioBuffer(buffer_pool_->GetBuffer(), 0, 0);
    return;
  }

  const int frame_size = sizeof(float) * decoded_config_.channel_number;
  const int original_frame_count = buffer->data_size() / frame_size;
  if (original_frame_count == 0) {
    // Don't send empty buffers since it is interpreted as EOS.
    delegate_->OnPushBufferComplete(MediaPipelineBackend::kBufferSuccess);
    return;
  }

  const int frame_count = buffer->data_size() / frame_size;
  DCHECK_GT(frame_count, 0);

  if (frame_count > buffer_pool_frames_) {
    CreateBufferPool(decoded_config_, frame_count * 2);
  }

  auto io_buffer = buffer_pool_->GetBuffer();
  memcpy(io_buffer->data() + kAudioMessageHeaderSize, buffer->data(),
         buffer->data_size());

  pending_buffer_complete_ = true;
  mixer_input_->SendAudioBuffer(std::move(io_buffer), frame_count,
                                buffer->timestamp());
}

void AudioDecoderForMixer::FillNextBuffer(void* buffer,
                                          int frames,
                                          int64_t delay_timestamp,
                                          int64_t delay) {
  DCHECK(task_runner_->BelongsToCurrentThread());
  pending_output_frames_ = kNoPendingOutput;
  next_buffer_delay_ = RenderingDelay(delay, delay_timestamp);
  CheckBufferComplete();
}

void AudioDecoderForMixer::OnAudioReadyForPlayback(int64_t mixer_delay) {
  DCHECK(task_runner_->BelongsToCurrentThread());
  mixer_delay_ = RenderingDelay(mixer_delay, MonotonicClockNow());
  if (reported_ready_for_playback_) {
    return;
  }
  reported_ready_for_playback_ = true;
  backend_->OnAudioReadyForPlayback();
}

void AudioDecoderForMixer::OnEosPlayed() {
  DCHECK(task_runner_->BelongsToCurrentThread());
  CheckBufferComplete();
  delegate_->OnEndOfStream();
}

void AudioDecoderForMixer::OnMixerError() {
  DCHECK(task_runner_->BelongsToCurrentThread());
  mixer_error_ = true;
  delegate_->OnDecoderError();
}

}  // namespace media
}  // namespace chromecast