// Copyright 2019 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chromecast/media/audio/mixer_service/output_stream_connection.h"
#include <limits>
#include <utility>
#include "base/logging.h"
#include "base/memory/aligned_memory.h"
#include "chromecast/media/audio/mixer_service/mixer_service_transport.pb.h"
#include "chromecast/media/audio/net/common.pb.h"
#include "chromecast/media/audio/net/conversions.h"
#include "chromecast/metrics/metrics_recorder.h"
#include "chromecast/net/io_buffer_pool.h"
namespace chromecast {
namespace media {
namespace mixer_service {
namespace {
int GetFrameSize(const OutputStreamParams& params) {
return audio_service::GetSampleSizeBytes(params.sample_format()) *
params.num_channels();
}
int GetFillSizeFrames(const OutputStreamParams& params) {
if (params.has_fill_size_frames()) {
return params.fill_size_frames();
}
// Use 10 ms by default.
return params.sample_rate() / 100;
}
enum MessageTypes : int {
kInitial = 1,
kStartTimestamp,
kPlaybackRate,
kAudioClockRate,
kStreamVolume,
kPauseResume,
kEndOfStream,
kTimestampAdjustment,
};
} // namespace
OutputStreamConnection::OutputStreamConnection(Delegate* delegate,
const OutputStreamParams& params)
: delegate_(delegate),
params_(std::make_unique<OutputStreamParams>(params)),
frame_size_(GetFrameSize(params)),
fill_size_frames_(GetFillSizeFrames(params)),
buffer_pool_(base::MakeRefCounted<IOBufferPool>(
MixerSocket::kAudioMessageHeaderSize +
fill_size_frames_ * frame_size_,
std::numeric_limits<size_t>::max(),
true /* threadsafe */)),
audio_buffer_(buffer_pool_->GetBuffer()) {
DCHECK(delegate_);
DCHECK_GT(params_->sample_rate(), 0);
DCHECK_GT(params_->num_channels(), 0);
params_->set_fill_size_frames(fill_size_frames_);
}
OutputStreamConnection::~OutputStreamConnection() = default;
void OutputStreamConnection::Connect() {
MixerConnection::Connect();
}
void OutputStreamConnection::SendNextBuffer(int filled_frames, int64_t pts) {
SendAudioBuffer(std::move(audio_buffer_), filled_frames, pts);
audio_buffer_ = buffer_pool_->GetBuffer();
}
void OutputStreamConnection::SendAudioBuffer(
scoped_refptr<net::IOBuffer> audio_buffer,
int filled_frames,
int64_t pts) {
if (!socket_) {
LOG(INFO) << "Tried to send buffer without a connection";
return;
}
if (sent_eos_) {
// Don't send any more data after the EOS buffer.
return;
}
if (filled_frames == 0) {
// Send explicit end-of-stream message.
sent_eos_ = true;
Generic message;
message.mutable_eos_played_out();
socket_->SendProto(kEndOfStream, message);
return;
}
if (socket_->SendAudioBuffer(std::move(audio_buffer),
filled_frames * frame_size_, pts)) {
LOG_IF(INFO, dropping_audio_) << "Stopped dropping audio";
dropping_audio_ = false;
} else {
LOG_IF(WARNING, !dropping_audio_) << "Dropping audio";
dropping_audio_ = true;
}
}
void OutputStreamConnection::SetVolumeMultiplier(float multiplier) {
volume_multiplier_ = multiplier;
if (socket_) {
Generic message;
message.mutable_set_stream_volume()->set_volume(multiplier);
socket_->SendProto(kStreamVolume, message);
}
}
void OutputStreamConnection::SetStartTimestamp(int64_t start_timestamp,
int64_t pts) {
start_timestamp_ = start_timestamp;
start_pts_ = pts;
if (socket_) {
Generic message;
message.mutable_set_start_timestamp()->set_start_timestamp(start_timestamp);
message.mutable_set_start_timestamp()->set_start_pts(pts);
socket_->SendProto(kStartTimestamp, message);
}
}
void OutputStreamConnection::SetPlaybackRate(float playback_rate) {
playback_rate_ = playback_rate;
if (socket_) {
Generic message;
message.mutable_set_playback_rate()->set_playback_rate(playback_rate);
socket_->SendProto(kPlaybackRate, message);
}
}
void OutputStreamConnection::SetAudioClockRate(double rate) {
audio_clock_rate_ = rate;
if (socket_) {
Generic message;
message.mutable_set_audio_clock_rate()->set_rate(rate);
socket_->SendProto(kAudioClockRate, message);
}
}
void OutputStreamConnection::Pause() {
paused_ = true;
if (socket_) {
Generic message;
message.mutable_set_paused()->set_paused(true);
socket_->SendProto(kPauseResume, message);
}
}
void OutputStreamConnection::Resume() {
paused_ = false;
if (socket_) {
Generic message;
auto* pause_message = message.mutable_set_paused();
pause_message->set_paused(false);
socket_->SendProto(kPauseResume, message);
}
}
void OutputStreamConnection::SendTimestampAdjustment(
int64_t timestamp_adjustment) {
if (socket_) {
Generic message;
auto* adjustment_message = message.mutable_timestamp_adjustment();
adjustment_message->set_adjustment(timestamp_adjustment);
socket_->SendProto(kTimestampAdjustment, message);
}
}
void OutputStreamConnection::OnConnected(std::unique_ptr<MixerSocket> socket) {
socket_ = std::move(socket);
socket_->SetDelegate(this);
Generic message;
*(message.mutable_output_stream_params()) = *params_;
if (start_timestamp_ != INT64_MIN) {
message.mutable_set_start_timestamp()->set_start_timestamp(
start_timestamp_);
message.mutable_set_start_timestamp()->set_start_pts(start_pts_);
}
if (playback_rate_ != 1.0f) {
message.mutable_set_playback_rate()->set_playback_rate(playback_rate_);
}
if (audio_clock_rate_ != 1.0) {
message.mutable_set_audio_clock_rate()->set_rate(audio_clock_rate_);
}
if (volume_multiplier_ != 1.0f) {
message.mutable_set_stream_volume()->set_volume(volume_multiplier_);
}
if (paused_) {
message.mutable_set_paused()->set_paused(true);
}
socket_->SendProto(kInitial, message);
delegate_->FillNextBuffer(
audio_buffer_->data() + MixerSocket::kAudioMessageHeaderSize,
fill_size_frames_, std::numeric_limits<int64_t>::min(), 0);
}
void OutputStreamConnection::OnConnectionError() {
socket_.reset();
if (sent_eos_) {
delegate_->OnEosPlayed();
return;
}
MixerConnection::Connect();
}
bool OutputStreamConnection::HandleMetadata(const Generic& message) {
if (message.has_eos_played_out()) {
delegate_->OnEosPlayed();
return true;
}
if (message.has_push_result() && !sent_eos_) {
delegate_->FillNextBuffer(
audio_buffer_->data() + MixerSocket::kAudioMessageHeaderSize,
fill_size_frames_, message.push_result().delay_timestamp(),
message.push_result().delay());
}
if (message.has_ready_for_playback()) {
delegate_->OnAudioReadyForPlayback(
message.ready_for_playback().delay_microseconds());
}
if (message.has_error()) {
delegate_->OnMixerError();
}
if (message.has_mixer_underrun()) {
std::string metric_name =
(message.mixer_underrun().type() == MixerUnderrun::INPUT_UNDERRUN
? "Platform.Audio.Mixer.StreamUnderrun"
: "Platform.Audio.Mixer.OutputUnderrun");
std::unique_ptr<CastEventBuilder> event = CreateCastEvent(metric_name);
delegate_->ProcessCastEvent(event.get());
RecordCastEvent(metric_name, std::move(event),
/* verbose_log_level = */ 0);
delegate_->OnMixerUnderrun(static_cast<Delegate::MixerUnderrunType>(
message.mixer_underrun().type()));
}
return true;
}
} // namespace mixer_service
} // namespace media
} // namespace chromecast