chromium/chromecast/media/audio/cast_audio_output_stream.cc

// Copyright 2015 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "chromecast/media/audio/cast_audio_output_stream.h"

#include <string>
#include <utility>

#include "base/bits.h"
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/functional/callback_helpers.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/message_loop/message_pump_type.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/bind_post_task.h"
#include "base/task/single_thread_task_runner.h"
#include "base/time/time.h"
#include "chromecast/base/bind_to_task_runner.h"
#include "chromecast/base/metrics/cast_metrics_helper.h"
#include "chromecast/media/api/cma_backend_factory.h"
#include "chromecast/media/audio/cast_audio_manager.h"
#include "chromecast/media/audio/cast_audio_output_utils.h"
#include "chromecast/media/audio/cma_audio_output_stream.h"
#include "chromecast/media/audio/mixer_service/mixer_service_transport.pb.h"
#include "chromecast/media/audio/mixer_service/output_stream_connection.h"
#include "chromecast/media/audio/net/common.pb.h"
#include "chromecast/public/cast_media_shlib.h"
#include "chromecast/public/media/decoder_config.h"
#include "chromecast/public/media/media_pipeline_device_params.h"
#include "chromecast/public/volume_control.h"
#include "media/audio/audio_device_description.h"

#define POST_TO_CMA_WRAPPER(method, ...)                                      \
  do {                                                                        \
    DCHECK(cma_wrapper_);                                                     \
    audio_manager_->media_task_runner()->PostTask(                            \
        FROM_HERE,                                                            \
        base::BindOnce(&CmaAudioOutputStream::method,                         \
                       base::Unretained(cma_wrapper_.get()), ##__VA_ARGS__)); \
  } while (0)

#define POST_TO_MIXER_SERVICE_WRAPPER(method, ...)                     \
  do {                                                                 \
    DCHECK(mixer_service_wrapper_);                                    \
    mixer_service_wrapper_->io_task_runner()->PostTask(                \
        FROM_HERE,                                                     \
        base::BindOnce(&MixerServiceWrapper::method,                   \
                       base::Unretained(mixer_service_wrapper_.get()), \
                       ##__VA_ARGS__));                                \
  } while (0)

namespace {
// Below are settings for MixerService and the DirectAudio it uses.
constexpr base::TimeDelta kFadeTime = base::Milliseconds(5);
constexpr base::TimeDelta kCommunicationsMaxBufferedFrames =
    base::Milliseconds(50);
constexpr base::TimeDelta kMediaMaxBufferedFrames = base::Milliseconds(70);
}  // namespace

namespace chromecast {
namespace media {
namespace {

AudioContentType GetContentType(const std::string& device_id) {
  if (::media::AudioDeviceDescription::IsCommunicationsDevice(device_id)) {
    return AudioContentType::kCommunication;
  }
  return AudioContentType::kMedia;
}

audio_service::ContentType ConvertContentType(AudioContentType content_type) {
  switch (content_type) {
    case AudioContentType::kMedia:
      return audio_service::CONTENT_TYPE_MEDIA;
    case AudioContentType::kCommunication:
      return audio_service::CONTENT_TYPE_COMMUNICATION;
    default:
      NOTREACHED_IN_MIGRATION();
      return audio_service::CONTENT_TYPE_MEDIA;
  }
}

}  // namespace

class CastAudioOutputStream::MixerServiceWrapper
    : public mixer_service::OutputStreamConnection::Delegate {
 public:
  MixerServiceWrapper(const ::media::AudioParameters& audio_params,
                      const std::string& device_id);

  MixerServiceWrapper(const MixerServiceWrapper&) = delete;
  MixerServiceWrapper& operator=(const MixerServiceWrapper&) = delete;

  ~MixerServiceWrapper() override = default;

  void SetRunning(bool running);
  void Start(AudioSourceCallback* source_callback);
  void Stop(base::WaitableEvent* finished);
  void Close(base::OnceClosure closure);
  void SetVolume(double volume);
  int64_t GetMaxBufferedFrames();
  void Flush();

  base::SingleThreadTaskRunner* io_task_runner() {
    return io_task_runner_.get();
  }

 private:
  // mixer_service::OutputStreamConnection::Delegate implementation:
  void FillNextBuffer(void* buffer,
                      int frames,
                      int64_t delay_timestamp,
                      int64_t delay) override;
  // We don't push an EOS buffer.
  void OnEosPlayed() override { NOTREACHED_IN_MIGRATION(); }

  const ::media::AudioParameters audio_params_;
  const std::string device_id_;
  std::unique_ptr<::media::AudioBus> audio_bus_;
  AudioSourceCallback* source_callback_;
  std::unique_ptr<mixer_service::OutputStreamConnection> mixer_connection_;
  double volume_;
  int64_t max_buffered_frames_;

  base::Lock running_lock_;
  bool running_ = true;
  // MixerServiceWrapper must run on an "io thread".
  base::Thread io_thread_;
  // Task runner on |io_thread_|.
  scoped_refptr<base::SingleThreadTaskRunner> io_task_runner_;
  THREAD_CHECKER(io_thread_checker_);
};

CastAudioOutputStream::MixerServiceWrapper::MixerServiceWrapper(
    const ::media::AudioParameters& audio_params,
    const std::string& device_id)
    : audio_params_(audio_params),
      device_id_(device_id),
      source_callback_(nullptr),
      volume_(1.0f),
      max_buffered_frames_(GetMaxBufferedFrames()),
      io_thread_("CastAudioOutputStream IO") {
  DETACH_FROM_THREAD(io_thread_checker_);

  base::Thread::Options options;
  options.message_pump_type = base::MessagePumpType::IO;
  options.thread_type = base::ThreadType::kRealtimeAudio;
  CHECK(io_thread_.StartWithOptions(std::move(options)));
  io_task_runner_ = io_thread_.task_runner();
  DCHECK(io_task_runner_);
}

void CastAudioOutputStream::MixerServiceWrapper::SetRunning(bool running) {
  base::AutoLock lock(running_lock_);
  running_ = running;
}

void CastAudioOutputStream::MixerServiceWrapper::Start(
    AudioSourceCallback* source_callback) {
  DCHECK_CALLED_ON_VALID_THREAD(io_thread_checker_);

  mixer_service::OutputStreamParams params;
  params.set_content_type(audio_service::CONTENT_TYPE_MEDIA);
  params.set_focus_type(ConvertContentType(GetContentType(device_id_)));
  params.set_device_id(device_id_);
  params.set_stream_type(
      mixer_service::OutputStreamParams::STREAM_TYPE_DEFAULT);
  params.set_sample_format(audio_service::SAMPLE_FORMAT_FLOAT_P);
  params.set_sample_rate(audio_params_.sample_rate());
  params.set_num_channels(audio_params_.channels());

  params.set_start_threshold_frames(max_buffered_frames_);
  params.set_max_buffered_frames(max_buffered_frames_);
  params.set_fill_size_frames(audio_params_.frames_per_buffer());

  params.set_fade_frames(::media::AudioTimestampHelper::TimeToFrames(
      kFadeTime, audio_params_.sample_rate()));
  params.set_use_start_timestamp(false);

  source_callback_ = source_callback;
  mixer_connection_ =
      std::make_unique<mixer_service::OutputStreamConnection>(this, params);
  mixer_connection_->Connect();
  mixer_connection_->SetVolumeMultiplier(volume_);
}

void CastAudioOutputStream::MixerServiceWrapper::Stop(
    base::WaitableEvent* finished) {
  DCHECK_CALLED_ON_VALID_THREAD(io_thread_checker_);
  mixer_connection_.reset();
  source_callback_ = nullptr;
  if (finished) {
    finished->Signal();
  }
}

void CastAudioOutputStream::MixerServiceWrapper::Flush() {
  DCHECK_CALLED_ON_VALID_THREAD(io_thread_checker_);
  // Nothing to do.
  return;
}

void CastAudioOutputStream::MixerServiceWrapper::Close(
    base::OnceClosure closure) {
  DCHECK_CALLED_ON_VALID_THREAD(io_thread_checker_);
  Stop(nullptr);
  std::move(closure).Run();
}

int64_t CastAudioOutputStream::MixerServiceWrapper::GetMaxBufferedFrames() {
  int fill_size_frames = audio_params_.frames_per_buffer();
  base::TimeDelta target_max_buffered_ms = kMediaMaxBufferedFrames;
  if (GetContentType(device_id_) == AudioContentType::kCommunication) {
    target_max_buffered_ms = kCommunicationsMaxBufferedFrames;
  }

  int64_t target_max_buffered_frames =
      ::media::AudioTimestampHelper::TimeToFrames(target_max_buffered_ms,
                                                  audio_params_.sample_rate());

  // Calculate the buffer size necessary to achieve at least the desired buffer
  // duration, while minimizing latency.
  int64_t max_buffered_frames = 0;
  if (fill_size_frames > target_max_buffered_frames) {
    max_buffered_frames = target_max_buffered_frames;
  } else {
    // Find the largest multiple of |fill_size_frames| that is still no larger
    // than |target_max_buffered_frames|.
    max_buffered_frames =
        (target_max_buffered_frames / fill_size_frames) * fill_size_frames;
  }

  if (max_buffered_frames != target_max_buffered_frames) {
    max_buffered_frames += 1;
  }

  return max_buffered_frames;
}

void CastAudioOutputStream::MixerServiceWrapper::SetVolume(double volume) {
  DCHECK_CALLED_ON_VALID_THREAD(io_thread_checker_);
  volume_ = volume;

  if (mixer_connection_)
    mixer_connection_->SetVolumeMultiplier(volume_);
}

void CastAudioOutputStream::MixerServiceWrapper::FillNextBuffer(
    void* buffer,
    int frames,
    int64_t delay_timestamp,
    int64_t delay) {
  DCHECK_CALLED_ON_VALID_THREAD(io_thread_checker_);

  // Round down to closest multiple of 4 to ensure correct channel alignment.
  frames = base::bits::AlignDownDeprecatedDoNotUse(frames, 4);

  // Acquire running_lock_ for the scope of this fill call to
  // prevent the source callback from closing the output stream
  // mid-fill.
  base::AutoLock lock(running_lock_);

  // Do not fill more buffers if we have stopped running.
  if (!running_)
    return;

  int64_t playout_timestamp =
      (delay_timestamp == INT64_MIN ? INT64_MIN : delay_timestamp + delay);
  if (playout_timestamp < 0) {
    // Assume any negative timestamp is invalid.
    playout_timestamp = 0;
  }

  // Wrap the data buffer so we can write directly into it.
  if (!audio_bus_) {
    audio_bus_ = ::media::AudioBus::CreateWrapper(audio_params_.channels());
  }
  float* channel_data = static_cast<float*>(buffer);
  for (int c = 0; c < audio_params_.channels(); ++c) {
    audio_bus_->SetChannelData(c, channel_data + c * frames);
  }
  audio_bus_->set_frames(frames);

  base::TimeDelta reported_delay = ::media::AudioTimestampHelper::FramesToTime(
      max_buffered_frames_, audio_params_.sample_rate());
  base::TimeTicks reported_delay_timestamp =
      base::TimeTicks() + base::Microseconds(playout_timestamp);

  int frames_filled = source_callback_->OnMoreData(
      reported_delay, reported_delay_timestamp, {}, audio_bus_.get());
  DCHECK_EQ(frames_filled, frames);
  mixer_connection_->SendNextBuffer(frames);
}

CastAudioOutputStream::CastAudioOutputStream(
    CastAudioManagerHelper* audio_manager,
    const ::media::AudioParameters& audio_params,
    const std::string& device_id_or_group_id,
    bool use_mixer_service)
    : volume_(1.0),
      audio_thread_state_(AudioOutputState::kClosed),
      audio_manager_(audio_manager),
      audio_params_(audio_params),
      device_id_(IsValidDeviceId(device_id_or_group_id)
                     ? device_id_or_group_id
                     : ::media::AudioDeviceDescription::kDefaultDeviceId),
      group_id_(GetGroupId(device_id_or_group_id)),
      use_mixer_service_(use_mixer_service),
      audio_weak_factory_(this) {
  DCHECK(audio_manager_);
  DETACH_FROM_THREAD(audio_thread_checker_);
  DVLOG(1) << __func__ << " " << this << " created from group_id=" << group_id_
           << " with audio_params=" << audio_params_.AsHumanReadableString();
  audio_weak_this_ = audio_weak_factory_.GetWeakPtr();
}

CastAudioOutputStream::~CastAudioOutputStream() {
  DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_);
}

bool CastAudioOutputStream::Open() {
  DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_);
  DVLOG(1) << this << ": " << __func__;
  if (audio_thread_state_ != AudioOutputState::kClosed)
    return false;

  // Sanity check the audio parameters.
  ::media::AudioParameters::Format format = audio_params_.format();
  DCHECK((format == ::media::AudioParameters::AUDIO_PCM_LINEAR) ||
         (format == ::media::AudioParameters::AUDIO_PCM_LOW_LATENCY));
  ::media::ChannelLayout channel_layout = audio_params_.channel_layout();
  if ((channel_layout != ::media::CHANNEL_LAYOUT_MONO) &&
      (channel_layout != ::media::CHANNEL_LAYOUT_STEREO)) {
    LOG(WARNING) << "Unsupported channel layout: " << channel_layout;
    return false;
  }
  DCHECK_GE(audio_params_.channels(), 1);
  DCHECK_LE(audio_params_.channels(), 2);

  const std::string application_session_id =
      audio_manager_->GetSessionId(group_id_);
  LOG_IF(WARNING, application_session_id.empty()) << "Session id is empty.";
  DVLOG(1) << this << ": " << __func__
           << ", session_id=" << application_session_id;

  if (!use_mixer_service_) {
    cma_wrapper_ = std::make_unique<CmaAudioOutputStream>(
        audio_params_, audio_params_.GetBufferDuration(), device_id_,
        audio_manager_->GetCmaBackendFactory());
    POST_TO_CMA_WRAPPER(Initialize, application_session_id);
  } else {
    DCHECK(!(audio_params_.effects() & ::media::AudioParameters::MULTIZONE));

    mixer_service_wrapper_ =
        std::make_unique<MixerServiceWrapper>(audio_params_, device_id_);
  }

  audio_thread_state_ = AudioOutputState::kOpened;

  return true;
}

void CastAudioOutputStream::Close() {
  DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_);
  DVLOG(1) << this << ": " << __func__;

  audio_thread_state_ = AudioOutputState::kPendingClose;
  base::OnceClosure finish_callback = BindToCurrentThread(
      base::BindOnce(&CastAudioOutputStream::FinishClose, audio_weak_this_));

  if (mixer_service_wrapper_) {
    // Synchronously set running to false to guarantee that
    // AudioSourceCallback::OnMoreData() will not be called anymore.
    mixer_service_wrapper_->SetRunning(false);
    POST_TO_MIXER_SERVICE_WRAPPER(
        Close,
        base::BindPostTask(audio_manager_->audio_manager()->GetTaskRunner(),
                           std::move(finish_callback)));
  } else if (cma_wrapper_) {
    // Synchronously set running to false to guarantee that
    // AudioSourceCallback::OnMoreData() will not be called anymore.
    cma_wrapper_->SetRunning(false);
    POST_TO_CMA_WRAPPER(Close, std::move(finish_callback));
  } else {
    std::move(finish_callback).Run();
  }
}

void CastAudioOutputStream::FinishClose() {
  DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_);
  // Signal to the manager that we're closed and can be removed.
  // This should be the last call during the close process as it deletes "this".
  audio_manager_->audio_manager()->ReleaseOutputStream(this);
}

void CastAudioOutputStream::Start(AudioSourceCallback* source_callback) {
  DCHECK(source_callback);
  DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_);
  // We allow calls to start even in the unopened state.
  DCHECK_NE(audio_thread_state_, AudioOutputState::kPendingClose);
  DVLOG(2) << this << ": " << __func__;
  audio_thread_state_ = AudioOutputState::kStarted;
  metrics::CastMetricsHelper::GetInstance()->LogTimeToFirstAudio();

  // |cma_wrapper_| and |mixer_service_wrapper_| cannot be both active.
  DCHECK(!(cma_wrapper_ && mixer_service_wrapper_));

  if (cma_wrapper_) {
    POST_TO_CMA_WRAPPER(Start, source_callback);
  } else {
    POST_TO_MIXER_SERVICE_WRAPPER(Start, source_callback);
  }
}

void CastAudioOutputStream::Stop() {
  DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_);
  DVLOG(2) << this << ": " << __func__;
  // We allow calls to stop even in the unstarted/unopened state.
  if (audio_thread_state_ != AudioOutputState::kStarted)
    return;
  audio_thread_state_ = AudioOutputState::kOpened;

  // |cma_wrapper_| and |mixer_service_wrapper_| cannot be both active.
  DCHECK(!(cma_wrapper_ && mixer_service_wrapper_));

  base::WaitableEvent finished;
  if (cma_wrapper_) {
    POST_TO_CMA_WRAPPER(Stop, &finished);
  } else if (mixer_service_wrapper_) {
    POST_TO_MIXER_SERVICE_WRAPPER(Stop, &finished);
  } else {
    finished.Signal();
  }
  finished.Wait();
}

void CastAudioOutputStream::Flush() {
  DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_);
  DVLOG(2) << this << ": " << __func__;

  // |cma_wrapper_| and |mixer_service_wrapper_| cannot be both active.
  DCHECK(!(cma_wrapper_ && mixer_service_wrapper_));

  if (cma_wrapper_) {
    // Make sure this is not on the same thread as CMA_WRAPPER to prevent
    // deadlock.
    DCHECK(!audio_manager_->media_task_runner()->BelongsToCurrentThread());

    base::WaitableEvent finished;
    POST_TO_CMA_WRAPPER(Flush, base::Unretained(&finished));
    finished.Wait();
  } else if (mixer_service_wrapper_) {
    POST_TO_MIXER_SERVICE_WRAPPER(Flush);
  }
}

void CastAudioOutputStream::SetVolume(double volume) {
  DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_);
  DCHECK_NE(audio_thread_state_, AudioOutputState::kPendingClose);
  DVLOG(2) << this << ": " << __func__ << "(" << volume << ")";
  volume_ = volume;

  DCHECK(!(cma_wrapper_ && mixer_service_wrapper_));

  if (cma_wrapper_) {
    POST_TO_CMA_WRAPPER(SetVolume, volume);
  } else {
    POST_TO_MIXER_SERVICE_WRAPPER(SetVolume, volume);
  }
}

void CastAudioOutputStream::GetVolume(double* volume) {
  DCHECK_CALLED_ON_VALID_THREAD(audio_thread_checker_);
  *volume = volume_;
}

}  // namespace media
}  // namespace chromecast