chromium/chromecast/media/cma/pipeline/media_pipeline_impl.cc

// Copyright 2014 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "chromecast/media/cma/pipeline/media_pipeline_impl.h"

#include <algorithm>
#include <utility>

#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/functional/callback_helpers.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/task/single_thread_task_runner.h"
#include "base/time/default_tick_clock.h"
#include "chromecast/base/metrics/cast_metrics_helper.h"
#include "chromecast/media/api/cma_backend.h"
#include "chromecast/media/cdm/cast_cdm_context.h"
#include "chromecast/media/cma/base/buffering_controller.h"
#include "chromecast/media/cma/base/buffering_state.h"
#include "chromecast/media/cma/base/coded_frame_provider.h"
#include "chromecast/media/cma/pipeline/audio_pipeline_impl.h"
#include "chromecast/media/cma/pipeline/cma_pipeline_buildflags.h"
#include "chromecast/media/cma/pipeline/media_pipeline_observer.h"
#include "chromecast/media/cma/pipeline/video_pipeline_impl.h"
#include "media/base/timestamp_constants.h"

namespace chromecast {
namespace media {

namespace {

// Buffering parameters when load_type is kLoadTypeUrl.
constexpr base::TimeDelta kLowBufferThresholdURL(base::Milliseconds(2000));
constexpr base::TimeDelta kHighBufferThresholdURL(base::Milliseconds(6000));

// Buffering parameters when load_type is kLoadTypeMediaSource.
constexpr base::TimeDelta kLowBufferThresholdMediaSource(base::Milliseconds(0));
constexpr base::TimeDelta kHighBufferThresholdMediaSource(
    base::Milliseconds(1000));

// Interval between two updates of the media time.
constexpr base::TimeDelta kTimeUpdateInterval(base::Milliseconds(250));

// Interval between two updates of the statistics is equal to:
// kTimeUpdateInterval * kStatisticsUpdatePeriod.
const int kStatisticsUpdatePeriod = 4;

// Stall duration threshold that triggers a playback stall event.
constexpr int kPlaybackStallEventThresholdMs = 2500;

void LogEstimatedBitrate(int decoded_bytes,
                         base::TimeDelta elapsed_time,
                         const char* tag,
                         const char* metric) {
  int estimated_bitrate_in_kbps =
      8 * decoded_bytes / elapsed_time.InMilliseconds();

  if (estimated_bitrate_in_kbps <= 0)
    return;

  LOG(INFO) << "Estimated " << tag << " bitrate is "
            << estimated_bitrate_in_kbps << " kbps";
  metrics::CastMetricsHelper* metrics_helper =
      metrics::CastMetricsHelper::GetInstance();
  metrics_helper->RecordApplicationEventWithValue(metric,
                                                  estimated_bitrate_in_kbps);
}

}  // namespace

struct MediaPipelineImpl::FlushTask {
  bool audio_flushed;
  bool video_flushed;
  base::OnceClosure done_cb;
};

MediaPipelineImpl::MediaPipelineImpl()
    : cdm_context_(nullptr),
      backend_state_(BACKEND_STATE_UNINITIALIZED),
      playback_rate_(0),
      audio_decoder_(nullptr),
      video_decoder_(nullptr),
      pending_time_update_task_(false),
      last_media_time_(::media::kNoTimestamp),
      statistics_rolling_counter_(0),
      audio_bytes_for_bitrate_estimation_(0),
      video_bytes_for_bitrate_estimation_(0),
      playback_stalled_(false),
      playback_stalled_notification_sent_(false),
      media_time_interpolator_(base::DefaultTickClock::GetInstance()),
      weak_factory_(this) {
  LOG(INFO) << __FUNCTION__;
  weak_this_ = weak_factory_.GetWeakPtr();
  thread_checker_.DetachFromThread();
}

MediaPipelineImpl::~MediaPipelineImpl() {
  LOG(INFO) << __FUNCTION__;
  DCHECK(thread_checker_.CalledOnValidThread());

  // TODO(b/67112414): Do something better than this.
  MediaPipelineObserver::NotifyPipelineDestroyed(this);

  if (backend_state_ != BACKEND_STATE_UNINITIALIZED &&
      backend_state_ != BACKEND_STATE_INITIALIZED)
    metrics::CastMetricsHelper::GetInstance()->RecordApplicationEvent(
        "Cast.Platform.Ended");
}

void MediaPipelineImpl::Initialize(
    LoadType load_type,
    std::unique_ptr<CmaBackend> media_pipeline_backend,
    bool is_buffering_enabled) {
  LOG(INFO) << __FUNCTION__;
  DCHECK(thread_checker_.CalledOnValidThread());
  media_pipeline_backend_ = std::move(media_pipeline_backend);

  if ((load_type == kLoadTypeURL || load_type == kLoadTypeMediaSource) &&
      is_buffering_enabled) {
    base::TimeDelta low_threshold(kLowBufferThresholdURL);
    base::TimeDelta high_threshold(kHighBufferThresholdURL);
    if (load_type == kLoadTypeMediaSource) {
      low_threshold = kLowBufferThresholdMediaSource;
      high_threshold = kHighBufferThresholdMediaSource;
    }
    scoped_refptr<BufferingConfig> buffering_config(
        new BufferingConfig(low_threshold, high_threshold));
    buffering_controller_.reset(new BufferingController(
        buffering_config,
        base::BindRepeating(&MediaPipelineImpl::OnBufferingNotification,
                            weak_this_)));
  }
}

void MediaPipelineImpl::SetClient(MediaPipelineClient client) {
  DCHECK(thread_checker_.CalledOnValidThread());
  DCHECK(!client.error_cb.is_null());
  DCHECK(!client.buffering_state_cb.is_null());
  client_ = std::move(client);
}

void MediaPipelineImpl::SetCdm(const base::UnguessableToken* cdm_id) {
  LOG(INFO) << __FUNCTION__
            << " cdm_id=" << ::media::CdmContext::CdmIdToString(cdm_id);
  DCHECK(thread_checker_.CalledOnValidThread());
  NOTIMPLEMENTED();
  // TODO(gunsch): SetCdm(int) is not implemented.
  // One possibility would be a GetCdmByIdCB that's passed in.
}

void MediaPipelineImpl::SetCdm(CastCdmContext* cdm_context) {
  LOG(INFO) << __FUNCTION__;
  DCHECK(thread_checker_.CalledOnValidThread());
  cdm_context_ = cdm_context;
  if (audio_pipeline_)
    audio_pipeline_->SetCdm(cdm_context);
  if (video_pipeline_)
    video_pipeline_->SetCdm(cdm_context);
}

::media::PipelineStatus MediaPipelineImpl::InitializeAudio(
    const ::media::AudioDecoderConfig& config,
    AvPipelineClient client,
    std::unique_ptr<CodedFrameProvider> frame_provider) {
  DCHECK(thread_checker_.CalledOnValidThread());
  DCHECK(!audio_decoder_);

  audio_decoder_ = media_pipeline_backend_->CreateAudioDecoder();
  if (!audio_decoder_) {
    return ::media::PIPELINE_ERROR_ABORT;
  }
  audio_pipeline_ =
      std::make_unique<AudioPipelineImpl>(audio_decoder_, std::move(client));
  if (cdm_context_)
    audio_pipeline_->SetCdm(cdm_context_);
  ::media::PipelineStatus status =
      audio_pipeline_->Initialize(config, std::move(frame_provider));

  if (status.is_ok()) {
    // TODO(b/67112414): Do something better than this.
    MediaPipelineObserver::NotifyAudioPipelineInitialized(this, config);
  }

  return status;
}

::media::PipelineStatus MediaPipelineImpl::InitializeVideo(
    const std::vector<::media::VideoDecoderConfig>& configs,
    VideoPipelineClient client,
    std::unique_ptr<CodedFrameProvider> frame_provider) {
  DCHECK(thread_checker_.CalledOnValidThread());
  DCHECK(!video_decoder_);

  video_decoder_ = media_pipeline_backend_->CreateVideoDecoder();
  if (!video_decoder_) {
    return ::media::PIPELINE_ERROR_ABORT;
  }
  video_pipeline_.reset(
      new VideoPipelineImpl(video_decoder_, std::move(client)));
  if (cdm_context_)
    video_pipeline_->SetCdm(cdm_context_);
  return video_pipeline_->Initialize(configs, std::move(frame_provider));
}

void MediaPipelineImpl::StartPlayingFrom(base::TimeDelta time) {
  LOG(INFO) << __FUNCTION__ << " t0=" << time.InMilliseconds();
  DCHECK(thread_checker_.CalledOnValidThread());
  DCHECK(audio_pipeline_ || video_pipeline_);
  DCHECK(!pending_flush_task_);

  // Lazy initialize.
  if (backend_state_ == BACKEND_STATE_UNINITIALIZED) {
    if (!media_pipeline_backend_->Initialize()) {
      OnError(::media::PIPELINE_ERROR_ABORT);
      return;
    }
    backend_state_ = BACKEND_STATE_INITIALIZED;
  }

  // Start the backend.
  if (!media_pipeline_backend_->Start(time.InMicroseconds())) {
    OnError(::media::PIPELINE_ERROR_ABORT);
    return;
  }
  backend_state_ = BACKEND_STATE_PLAYING;
  ResetBitrateState();
  metrics::CastMetricsHelper::GetInstance()->RecordApplicationEvent(
      "Cast.Platform.Playing");

  // Enable time updates.
  start_media_time_ = time;
  last_media_time_ = ::media::kNoTimestamp;
  statistics_rolling_counter_ = 0;
  if (!pending_time_update_task_) {
    pending_time_update_task_ = true;
    base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
        FROM_HERE,
        base::BindOnce(&MediaPipelineImpl::UpdateMediaTime, weak_this_));
  }

  waiting_for_first_have_enough_data_ = true;

  media_time_interpolator_.SetBounds(time, time, base::TimeTicks::Now());
  media_time_interpolator_.StartInterpolating();

  // Setup the audio and video pipeline for the new timeline.
  if (audio_pipeline_) {
    scoped_refptr<BufferingState> buffering_state;
    if (buffering_controller_)
      buffering_state = buffering_controller_->AddStream("audio");
    if (!audio_pipeline_->StartPlayingFrom(time, buffering_state)) {
      OnError(::media::PIPELINE_ERROR_ABORT);
      return;
    }
  }
  if (video_pipeline_) {
    scoped_refptr<BufferingState> buffering_state;
    if (buffering_controller_)
      buffering_state = buffering_controller_->AddStream("video");
    if (!video_pipeline_->StartPlayingFrom(time, buffering_state)) {
      OnError(::media::PIPELINE_ERROR_ABORT);
      return;
    }
  }
}

void MediaPipelineImpl::Flush(base::OnceClosure flush_cb) {
  LOG(INFO) << __FUNCTION__;
  DCHECK(thread_checker_.CalledOnValidThread());
  DCHECK((backend_state_ == BACKEND_STATE_PLAYING) ||
         (backend_state_ == BACKEND_STATE_PAUSED));
  DCHECK(audio_pipeline_ || video_pipeline_);
  DCHECK(!pending_flush_task_);

  media_time_interpolator_.StopInterpolating();

  buffering_controller_->Reset();

  // Flush both audio and video pipeline. This will flush the frame
  // provider and stop feeding buffers to the backend.
  // MediaPipelineImpl::OnFlushDone will stop the backend once flush completes.
  pending_flush_task_.reset(new FlushTask);
  pending_flush_task_->audio_flushed = !audio_pipeline_;
  pending_flush_task_->video_flushed = !video_pipeline_;
  pending_flush_task_->done_cb = std::move(flush_cb);
  if (audio_pipeline_) {
    audio_pipeline_->Flush(
        base::BindOnce(&MediaPipelineImpl::OnFlushDone, weak_this_, true));
  }
  if (video_pipeline_) {
    video_pipeline_->Flush(
        base::BindOnce(&MediaPipelineImpl::OnFlushDone, weak_this_, false));
  }
}

void MediaPipelineImpl::SetPlaybackRate(double rate) {
  LOG(INFO) << __FUNCTION__ << " rate=" << rate;
  DCHECK(thread_checker_.CalledOnValidThread());
  DCHECK((backend_state_ == BACKEND_STATE_PLAYING) ||
         (backend_state_ == BACKEND_STATE_PAUSED));

  playback_rate_ = rate;
  if (buffering_controller_ && buffering_controller_->IsBuffering())
    return;

  if (rate != 0.0f) {
    media_pipeline_backend_->SetPlaybackRate(rate);
    media_time_interpolator_.SetPlaybackRate(rate);
    if (backend_state_ == BACKEND_STATE_PAUSED) {
      media_pipeline_backend_->Resume();
      backend_state_ = BACKEND_STATE_PLAYING;
      ResetBitrateState();
      metrics::CastMetricsHelper::GetInstance()->RecordApplicationEvent(
          "Cast.Platform.Playing");
    }
  } else if (backend_state_ == BACKEND_STATE_PLAYING) {
    media_pipeline_backend_->Pause();
    media_time_interpolator_.SetPlaybackRate(0.f);
    backend_state_ = BACKEND_STATE_PAUSED;
    metrics::CastMetricsHelper::GetInstance()->RecordApplicationEvent(
        "Cast.Platform.Pause");
  }
}

void MediaPipelineImpl::SetVolume(float volume) {
  LOG(INFO) << __FUNCTION__ << " vol=" << volume;
  DCHECK(thread_checker_.CalledOnValidThread());
  if (audio_pipeline_)
    audio_pipeline_->SetVolume(volume);
}

base::TimeDelta MediaPipelineImpl::GetMediaTime() const {
  DCHECK(thread_checker_.CalledOnValidThread());
#if BUILDFLAG(CMA_USE_ACCURATE_MEDIA_TIME)
  base::TimeDelta time =
      base::Microseconds(media_pipeline_backend_->GetCurrentPts());
#else
  base::TimeDelta time = last_media_time_;
#endif
  return (time == ::media::kNoTimestamp ? start_media_time_ : time);
}

bool MediaPipelineImpl::HasAudio() const {
  DCHECK(thread_checker_.CalledOnValidThread());
  return audio_pipeline_ != nullptr;
}

bool MediaPipelineImpl::HasVideo() const {
  DCHECK(thread_checker_.CalledOnValidThread());
  return video_pipeline_ != nullptr;
}

void MediaPipelineImpl::OnFlushDone(bool is_audio_stream) {
  LOG(INFO) << __FUNCTION__ << " is_audio_stream=" << is_audio_stream;
  DCHECK(pending_flush_task_);

  if (is_audio_stream) {
    DCHECK(!pending_flush_task_->audio_flushed);
    pending_flush_task_->audio_flushed = true;
  } else {
    DCHECK(!pending_flush_task_->video_flushed);
    pending_flush_task_->video_flushed = true;
  }

  if (pending_flush_task_->audio_flushed &&
      pending_flush_task_->video_flushed) {
    // Stop the backend, so that the backend won't push their pending buffer,
    // which may be invalidated later, to hardware. (b/25342604)
    media_pipeline_backend_->Stop();
    backend_state_ = BACKEND_STATE_INITIALIZED;
    metrics::CastMetricsHelper::GetInstance()->RecordApplicationEvent(
        "Cast.Platform.Ended");

    base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
        FROM_HERE, std::move(pending_flush_task_->done_cb));
    pending_flush_task_.reset();
  }
}

void MediaPipelineImpl::OnBufferingNotification(bool is_buffering) {
  LOG(INFO) << __FUNCTION__ << " is_buffering=" << is_buffering;
  DCHECK(thread_checker_.CalledOnValidThread());
  DCHECK((backend_state_ == BACKEND_STATE_PLAYING) ||
         (backend_state_ == BACKEND_STATE_PAUSED));
  DCHECK(buffering_controller_);
  DCHECK_EQ(is_buffering, buffering_controller_->IsBuffering());

  if (waiting_for_first_have_enough_data_) {
    waiting_for_first_have_enough_data_ = is_buffering;
  }

  if (!waiting_for_first_have_enough_data_ && client_.buffering_state_cb) {
    ::media::BufferingState state = is_buffering
                                        ? ::media::BUFFERING_HAVE_NOTHING
                                        : ::media::BUFFERING_HAVE_ENOUGH;
    // Reports buffering state to WMPI. WMPI will change HTMLMediaElement ready
    // state:
    // HAVE_NOTHING -> HAVE_CURRENT_DATA
    // HAVE_ENOUGH -> HAVE_FUTURE_DATA or HAVE_ENOUGH_DATA
    // DEMUXER_UNDERFLOW is the only possible reason. We pass encoded audio to
    // the vendor-specific backend. Our buffering controller only reports a
    // buffering state change based on based on the difference between the
    // current playout PTS reported by the vendor backed and the most recent
    // encoded buffer.
    client_.buffering_state_cb.Run(state, ::media::DEMUXER_UNDERFLOW);
  }

  if (is_buffering && (backend_state_ == BACKEND_STATE_PLAYING)) {
    media_pipeline_backend_->Pause();
    media_time_interpolator_.SetPlaybackRate(0.f);
    backend_state_ = BACKEND_STATE_PAUSED;
    metrics::CastMetricsHelper::GetInstance()->RecordApplicationEvent(
        "Cast.Platform.Pause");
  } else if (!is_buffering && (backend_state_ == BACKEND_STATE_PAUSED)) {
    // Once we finish buffering, we need to honour the desired playback rate
    // (rather than just resuming). This way, if playback was paused while
    // buffering, it will remain paused rather than incorrectly resuming.
    SetPlaybackRate(playback_rate_);
  }
}

void MediaPipelineImpl::CheckForPlaybackStall(base::TimeDelta media_time,
                                              base::TimeTicks current_stc) {
  DCHECK(media_time != ::media::kNoTimestamp);

  // A playback stall is defined as a scenario where the underlying media
  // pipeline has unexpectedly stopped making forward progress. The pipeline is
  // NOT stalled if:
  //
  // 1. Media time is progressing
  // 2. The backend is paused
  // 3. We are currently buffering (this is captured in a separate event)
  if (media_time != last_media_time_ ||
      backend_state_ != BACKEND_STATE_PLAYING ||
      (buffering_controller_ && buffering_controller_->IsBuffering())) {
    if (playback_stalled_) {
      // Transition out of the stalled condition.
      base::TimeDelta stall_duration = current_stc - playback_stalled_time_;
      LOG(INFO) << "Transitioning out of stalled state. Stall duration was "
                << stall_duration.InMilliseconds() << " ms";
      playback_stalled_ = false;
      playback_stalled_notification_sent_ = false;
    }
    return;
  }

  // Check to see if this is a new stall condition.
  if (!playback_stalled_) {
    playback_stalled_ = true;
    playback_stalled_time_ = current_stc;
    return;
  }

  // If we are in an existing stall, check to see if we've been stalled for more
  // than 2.5 s. If so, send a single notification of the stall event.
  if (!playback_stalled_notification_sent_) {
    base::TimeDelta current_stall_duration =
        current_stc - playback_stalled_time_;
    if (current_stall_duration.InMilliseconds() >=
        kPlaybackStallEventThresholdMs) {
      LOG(INFO) << "Playback stalled";
      metrics::CastMetricsHelper::GetInstance()->RecordApplicationEvent(
          "Cast.Platform.PlaybackStall");
      playback_stalled_notification_sent_ = true;
    }
    return;
  }
}

void MediaPipelineImpl::UpdateMediaTime() {
  pending_time_update_task_ = false;
  if ((backend_state_ != BACKEND_STATE_PLAYING) &&
      (backend_state_ != BACKEND_STATE_PAUSED))
    return;

  if (statistics_rolling_counter_ == 0) {
    if (audio_pipeline_)
      audio_pipeline_->UpdateStatistics();
    if (video_pipeline_)
      video_pipeline_->UpdateStatistics();

    if (backend_state_ == BACKEND_STATE_PLAYING) {
      base::TimeTicks current_time = base::TimeTicks::Now();
      if (audio_pipeline_)
        audio_bytes_for_bitrate_estimation_ +=
            audio_pipeline_->bytes_decoded_since_last_update();
      if (video_pipeline_)
        video_bytes_for_bitrate_estimation_ +=
            video_pipeline_->bytes_decoded_since_last_update();
      elapsed_time_delta_ += current_time - last_sample_time_;
      if (elapsed_time_delta_.InMilliseconds() > 5000) {
        if (audio_pipeline_)
          LogEstimatedBitrate(audio_bytes_for_bitrate_estimation_,
                              elapsed_time_delta_, "audio",
                              "Cast.Platform.AudioBitrate");
        if (video_pipeline_)
          LogEstimatedBitrate(video_bytes_for_bitrate_estimation_,
                              elapsed_time_delta_, "video",
                              "Cast.Platform.VideoBitrate");
        ResetBitrateState();
      }
      last_sample_time_ = current_time;
    }
  }

  statistics_rolling_counter_ =
      (statistics_rolling_counter_ + 1) % kStatisticsUpdatePeriod;

  // Wait until the first available timestamp returned from backend, which means
  // the actual playback starts. Some of the rest of the logic, mainly media
  // time interpolating, expects a valid timestamp as baseline.
  base::TimeDelta media_time =
      base::Microseconds(media_pipeline_backend_->GetCurrentPts());
  if (media_time == ::media::kNoTimestamp &&
      (last_media_time_ == ::media::kNoTimestamp ||
       !media_time_interpolator_.interpolating())) {
    pending_time_update_task_ = true;
    base::SingleThreadTaskRunner::GetCurrentDefault()->PostDelayedTask(
        FROM_HERE,
        base::BindOnce(&MediaPipelineImpl::UpdateMediaTime, weak_this_),
        kTimeUpdateInterval);
    return;
  }

  base::TimeTicks stc = base::TimeTicks::Now();

  if (media_time == ::media::kNoTimestamp) {
    DCHECK(media_time_interpolator_.interpolating());
    media_time = media_time_interpolator_.GetInterpolatedTime();

    LOG(WARNING) << "Backend returns invalid timestamp. Estimated time is "
                 << media_time;
  } else {
    // It's safe to use kInfiniteDuration as upper bound. When pipeline
    // rebuffers, time interpolator is also paused, in which case it returns
    // the timestamp when pausing it.
    media_time_interpolator_.SetBounds(media_time, ::media::kInfiniteDuration,
                                       stc);
  }

  CheckForPlaybackStall(media_time, stc);

  base::TimeDelta max_rendering_time = media_time;
  if (buffering_controller_) {
    buffering_controller_->SetMediaTime(media_time);

    // Receiving the same time twice in a row means playback isn't moving,
    // so don't interpolate ahead.
    if (media_time != last_media_time_) {
      max_rendering_time = buffering_controller_->GetMaxRenderingTime();
      if (max_rendering_time == ::media::kNoTimestamp)
        max_rendering_time = media_time;

      // Cap interpolation time to avoid interpolating too far ahead.
      max_rendering_time =
          std::min(max_rendering_time, media_time + 2 * kTimeUpdateInterval);
    }
  }

  last_media_time_ = media_time;
  if (!client_.time_update_cb.is_null())
    client_.time_update_cb.Run(media_time, max_rendering_time, stc);

  pending_time_update_task_ = true;
  base::SingleThreadTaskRunner::GetCurrentDefault()->PostDelayedTask(
      FROM_HERE,
      base::BindOnce(&MediaPipelineImpl::UpdateMediaTime, weak_this_),
      kTimeUpdateInterval);
}

void MediaPipelineImpl::OnError(::media::PipelineStatus error) {
  DCHECK(thread_checker_.CalledOnValidThread());
  DCHECK(error != ::media::PIPELINE_OK) << "PIPELINE_OK is not an error!";

  metrics::CastMetricsHelper::GetInstance()->RecordApplicationEventWithValue(
      "Cast.Platform.Error", error.code());

  if (!client_.error_cb.is_null())
    std::move(client_.error_cb).Run(error);
}

void MediaPipelineImpl::ResetBitrateState() {
  elapsed_time_delta_ = base::Seconds(0);
  audio_bytes_for_bitrate_estimation_ = 0;
  video_bytes_for_bitrate_estimation_ = 0;
  last_sample_time_ = base::TimeTicks::Now();
}

}  // namespace media
}  // namespace chromecast