chromium/media/renderers/win/media_foundation_stream_wrapper.cc

// Copyright 2019 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "media/renderers/win/media_foundation_stream_wrapper.h"

#include <mferror.h>

#include "base/functional/bind.h"
#include "base/task/sequenced_task_runner.h"
#include "base/trace_event/base_tracing.h"
#include "media/base/media_switches.h"
#include "media/base/video_codecs.h"
#include "media/base/win/mf_helpers.h"
#include "media/renderers/win/media_foundation_audio_stream.h"
#include "media/renderers/win/media_foundation_source_wrapper.h"
#include "media/renderers/win/media_foundation_video_stream.h"

namespace media {

using Microsoft::WRL::ComPtr;

namespace {

PendingInputBuffer::PendingInputBuffer(DemuxerStream::Status status,
                                       scoped_refptr<DecoderBuffer> buffer)
    : status(status), buffer(std::move(buffer)) {}

PendingInputBuffer::PendingInputBuffer(DemuxerStream::Status status)
    : status(status) {}

PendingInputBuffer::PendingInputBuffer(const PendingInputBuffer& other) =
    default;

PendingInputBuffer::~PendingInputBuffer() = default;

}  // namespace

MediaFoundationStreamWrapper::MediaFoundationStreamWrapper() = default;
MediaFoundationStreamWrapper::~MediaFoundationStreamWrapper() = default;

/*static*/
HRESULT MediaFoundationStreamWrapper::Create(
    int stream_id,
    IMFMediaSource* parent_source,
    DemuxerStream* demuxer_stream,
    std::unique_ptr<MediaLog> media_log,
    scoped_refptr<base::SequencedTaskRunner> task_runner,
    MediaFoundationStreamWrapper** stream_out) {
  DVLOG(1) << __func__ << ": stream_id=" << stream_id;

  ComPtr<MediaFoundationStreamWrapper> stream;
  switch (demuxer_stream->type()) {
    case DemuxerStream::Type::VIDEO:
      RETURN_IF_FAILED(MediaFoundationVideoStream::Create(
          stream_id, parent_source, demuxer_stream, std::move(media_log),
          &stream));
      break;
    case DemuxerStream::Type::AUDIO:
      RETURN_IF_FAILED(MediaFoundationAudioStream::Create(
          stream_id, parent_source, demuxer_stream, std::move(media_log),
          &stream));
      break;
    default:
      DLOG(ERROR) << "Unsupported demuxer stream type: "
                  << demuxer_stream->type();
      return E_INVALIDARG;
  }
  stream->SetTaskRunner(std::move(task_runner));
  *stream_out = stream.Detach();
  return S_OK;
}

HRESULT MediaFoundationStreamWrapper::RuntimeClassInitialize(
    int stream_id,
    IMFMediaSource* parent_source,
    DemuxerStream* demuxer_stream,
    std::unique_ptr<MediaLog> media_log) {
  {
    base::AutoLock auto_lock(lock_);
    parent_source_ = parent_source;
  }
  demuxer_stream_ = demuxer_stream;
  stream_id_ = stream_id;
  stream_type_ = demuxer_stream_->type();

  DVLOG_FUNC(1) << "stream_id=" << stream_id
                << ", stream_type=" << DemuxerStream::GetTypeName(stream_type_);

  media_log_ = std::move(media_log);
  if (base::FeatureList::IsEnabled(kMediaFoundationBatchRead)) {
    if (kBatchReadCount.Get() < 1 || kBatchReadCount.Get() > 500) {
      DLOG(WARNING) << "batch_read_count_=" << kBatchReadCount.Get()
                    << " is out of range [1,500], "
                    << (kBatchReadCount.Get() < 1
                            ? "it shouldn't be negative or 0"
                            : "it will spend more time "
                              "on writing and reading and maybe impacts UX")
                    << "; setting batch_read_count_=1";
      batch_read_count_ = 1;
    } else {
      batch_read_count_ = kBatchReadCount.Get();
    }
  }
  DVLOG_FUNC(1) << "batch_read_count_=" << batch_read_count_;

  RETURN_IF_FAILED(GenerateStreamDescriptor());
  RETURN_IF_FAILED(MFCreateEventQueue(&mf_media_event_queue_));
  return S_OK;
}

void MediaFoundationStreamWrapper::SetTaskRunner(
    scoped_refptr<base::SequencedTaskRunner> task_runner) {
  DVLOG_FUNC(1);

  task_runner_ = std::move(task_runner);
}

void MediaFoundationStreamWrapper::DetachParent() {
  DVLOG_FUNC(1);

  base::AutoLock auto_lock(lock_);
  parent_source_ = nullptr;
}

void MediaFoundationStreamWrapper::DetachDemuxerStream() {
  DVLOG_FUNC(1);
  DCHECK(task_runner_->RunsTasksInCurrentSequence());

  demuxer_stream_ = nullptr;
}

void MediaFoundationStreamWrapper::SetSelected(bool selected) {
  DVLOG_FUNC(2) << "selected=" << selected;

  base::AutoLock auto_lock(lock_);
  selected_ = selected;
}

bool MediaFoundationStreamWrapper::IsSelected() {
  base::AutoLock auto_lock(lock_);
  DVLOG_FUNC(2) << "selected_=" << selected_;

  return selected_;
}

bool MediaFoundationStreamWrapper::IsEnabled() {
  base::AutoLock auto_lock(lock_);
  DVLOG_FUNC(2) << "enabled_=" << enabled_;

  return enabled_;
}

void MediaFoundationStreamWrapper::SetEnabled(bool enabled) {
  DVLOG_FUNC(2) << "enabled=" << enabled;

  {
    base::AutoLock auto_lock(lock_);
    if (enabled_ == enabled)
      return;
    enabled_ = enabled;
  }
  // Restart processing of queued requests when stream is re-enabled.
  ProcessRequestsIfPossible();
}

void MediaFoundationStreamWrapper::SetFlushed(bool flushed) {
  DVLOG_FUNC(2) << "flushed=" << flushed;

  base::AutoLock auto_lock(lock_);
  flushed_ = flushed;
  if (flushed_) {
    DVLOG_FUNC(2) << "flush buffer_queue_";
    buffer_queue_.clear();
    while (!post_flush_buffers_.empty()) {
      post_flush_buffers_.pop();
    }
  }
}

bool MediaFoundationStreamWrapper::HasEnded() const {
  DVLOG_FUNC(2) << "stream_ended_=" << stream_ended_;

  return stream_ended_;
}

void MediaFoundationStreamWrapper::SetLastStartPosition(
    const PROPVARIANT* start_position) {
  // Events such as MF_MEDIA_ENGINE_EVENT_SEEKED may send a start position
  // with VT_EMPTY, this event should be ignored since it is not a valid start
  // time. Only VT_I8 will be used based on start of presentation:
  // https://learn.microsoft.com/en-us/windows/win32/api/mfidl/nf-mfidl-imfmediasession-start
  if (start_position->vt == VT_I8) {
    base::AutoLock auto_lock(lock_);
    last_start_time_ = start_position->hVal.QuadPart;
  }
}

HRESULT MediaFoundationStreamWrapper::QueueStartedEvent(
    const PROPVARIANT* start_position) {
  DVLOG_FUNC(2);

  // Save the new start position in the stream.
  SetLastStartPosition(start_position);

  state_ = State::kStarted;
  RETURN_IF_FAILED(mf_media_event_queue_->QueueEventParamVar(
      MEStreamStarted, GUID_NULL, S_OK, start_position));
  return S_OK;
}

HRESULT MediaFoundationStreamWrapper::QueueSeekedEvent(
    const PROPVARIANT* start_position) {
  DVLOG_FUNC(2);

  // Save the new start position in the stream.
  SetLastStartPosition(start_position);

  state_ = State::kStarted;
  RETURN_IF_FAILED(mf_media_event_queue_->QueueEventParamVar(
      MEStreamSeeked, GUID_NULL, S_OK, start_position));
  return S_OK;
}

HRESULT MediaFoundationStreamWrapper::QueueStoppedEvent() {
  DVLOG_FUNC(2);

  state_ = State::kStopped;
  RETURN_IF_FAILED(mf_media_event_queue_->QueueEventParamVar(
      MEStreamStopped, GUID_NULL, S_OK, nullptr));
  return S_OK;
}

HRESULT MediaFoundationStreamWrapper::QueuePausedEvent() {
  DVLOG_FUNC(2);

  state_ = State::kPaused;
  RETURN_IF_FAILED(mf_media_event_queue_->QueueEventParamVar(
      MEStreamPaused, GUID_NULL, S_OK, nullptr));
  return S_OK;
}

DemuxerStream::Type MediaFoundationStreamWrapper::StreamType() const {
  return stream_type_;
}

void MediaFoundationStreamWrapper::ProcessRequestsIfPossible() {
  DVLOG_FUNC(3);
  DCHECK(task_runner_->RunsTasksInCurrentSequence());

  {
    base::AutoLock auto_lock(lock_);

    if (state_ == State::kPaused || !enabled_)
      return;

    if (pending_sample_request_tokens_.empty()) {
      return;
    }
  }

  if (ServicePostFlushSampleRequest()) {
    // A sample has been consumed from the |post_flush_buffers_|.
    return;
  }

  if (!demuxer_stream_) {
    return;
  }

  base::AutoLock auto_lock(lock_);
  if (!buffer_queue_.empty()) {
    // Using queued buffer for multi buffers read from Renderer process. If
    // a valid buffer already exists in queued buffer, return the buffer
    // directly without IPC calls for buffer requested from MediaEngine.
    OnDemuxerStreamRead(buffer_queue_.front().status,
                        std::move(buffer_queue_.front().buffer));
    buffer_queue_.pop_front();
    return;
  }

  // Request multi buffers by sending IPC to 'MojoDemuxerStreamImpl'.
  if (!pending_stream_read_) {
    DVLOG_FUNC(3) << " IPC send, batch_read_count_=" << batch_read_count_;
    TRACE_EVENT2("media", "MFGetBuffersFromRendererByIPC",
                 "StreamType:", DemuxerStream::GetTypeName(stream_type_),
                 "batch_read_count_:", batch_read_count_);
    pending_stream_read_ = true;
    demuxer_stream_->Read(
        batch_read_count_,
        base::BindOnce(
            &MediaFoundationStreamWrapper::OnDemuxerStreamReadBuffers,
            weak_factory_.GetWeakPtr()));
  }
}

void MediaFoundationStreamWrapper::OnDemuxerStreamReadBuffers(
    DemuxerStream::Status status,
    DemuxerStream::DecoderBufferVector buffers) {
  DCHECK(task_runner_->RunsTasksInCurrentSequence());
  DVLOG_FUNC(3) << "receive data, status="
                << DemuxerStream::GetStatusName(status)
                << ", buffer count= " << buffers.size()
                << ", stream type=" << DemuxerStream::GetTypeName(stream_type_);
  {
    base::AutoLock auto_lock(lock_);
    DCHECK(pending_stream_read_);
    pending_stream_read_ = false;

    DemuxerStream::DecoderBufferVector pending_buffers =
        (status == DemuxerStream::Status::kOk)
            ? std::move(buffers)
            : DemuxerStream::DecoderBufferVector{nullptr};
    for (auto& buffer : pending_buffers) {
      DVLOG_FUNC(3) << "push buffer to buffer_queue_, status="
                    << DemuxerStream::GetStatusName(status) << ", buffer="
                    << (buffer ? buffer->AsHumanReadableString(false) : "null");
      buffer_queue_.emplace_back(PendingInputBuffer(status, std::move(buffer)));
    }
  }

  // Restart processing of queued requests when we receive buffers.
  ProcessRequestsIfPossible();
}

HRESULT MediaFoundationStreamWrapper::ServiceSampleRequest(
    IUnknown* token,
    DecoderBuffer* buffer) {
  DVLOG_FUNC(3);
  DCHECK(task_runner_->RunsTasksInCurrentSequence());
  lock_.AssertAcquired();

  const bool is_end_of_stream = buffer->end_of_stream();
  const bool is_encrypted = !is_end_of_stream && buffer->decrypt_config();
  const auto timestamp_us =
      is_end_of_stream ? 0 : buffer->timestamp().InMicroseconds();
  TRACE_EVENT2("media", "MediaFoundationStreamWrapper::ServiceSampleRequest",
               "is_encrypted", is_encrypted, "timestamp_us", timestamp_us);

  if (is_end_of_stream) {
    if (!enabled_) {
      DVLOG_FUNC(2) << "Ignoring EOS for disabled stream";
      // token not dropped to reflect an outstanding request that stream wrapper
      // should service when the stream is enabled
      return S_OK;
    }
    DVLOG_FUNC(2) << "End of stream";
    RETURN_IF_FAILED(mf_media_event_queue_->QueueEventParamUnk(
        MEEndOfStream, GUID_NULL, S_OK, nullptr));
    stream_ended_ = true;
    if (parent_source_) {
      static_cast<MediaFoundationSourceWrapper*>(parent_source_.Get())
          ->CheckForEndOfPresentation();
    }
  } else {
    DVLOG_FUNC(3) << "buffer ts=" << buffer->timestamp()
                  << ", is_key_frame=" << buffer->is_key_frame();
    ComPtr<IMFSample> mf_sample;
    RETURN_IF_FAILED(GenerateSampleFromDecoderBuffer(
        buffer, &mf_sample, &last_key_id_,
        base::BindOnce(&MediaFoundationStreamWrapper::TransformSample,
                       base::Unretained(this))));
    if (token) {
      RETURN_IF_FAILED(mf_sample->SetUnknown(MFSampleExtension_Token, token));
    }

    RETURN_IF_FAILED(mf_media_event_queue_->QueueEventParamUnk(
        MEMediaSample, GUID_NULL, S_OK, mf_sample.Get()));
  }

  pending_sample_request_tokens_.pop();

  return S_OK;
}

bool MediaFoundationStreamWrapper::ServicePostFlushSampleRequest() {
  DVLOG_FUNC(3);
  DCHECK(task_runner_->RunsTasksInCurrentSequence());
  HRESULT hr = S_OK;

  base::AutoLock auto_lock(lock_);
  if (flushed_ && state_ == State::kStarted &&
      last_start_time_ != kInvalidTime) {
    // Video may freeze during consecutive backward seek since MF does not
    // cancel previous pending seek, while Chromium's source starts new seek
    // immediately. MF's seek finishes when a sample's timestamp is equal to
    // or greater than seek time. Thus it would cause video to freeze until
    // source send samples with timestamps matching the previous pending seek.

    // MEStreamTick event notifies a gap in data and notify downstream
    // components not to expect any data at the specified time, allowing
    // downstream components to cancel the first seek
    // https://learn.microsoft.com/en-us/windows/win32/medfound/mestreamtick

    // Stream ticks are continuously sent until flush completes to make sure
    // all downsteam components have been rid of stale samples.
    PROPVARIANT var;
    PropVariantInit(&var);
    var.vt = VT_I8;
    var.hVal.QuadPart = last_start_time_;
    hr = mf_media_event_queue_->QueueEventParamVar(MEStreamTick, GUID_NULL,
                                                   S_OK, &var);
    if (FAILED(hr)) {
      // Failure would indicate mf no longer accepts events, such as when
      // shutdown is called, thus stream ticks should no longer be needed.
      DLOG(WARNING) << "Failed to queue stream tick: " << PrintHr(hr);
    }
    return false;

  } else if ((flushed_ && state_ != State::kStarted) ||
             post_flush_buffers_.empty()) {
    return false;
  }

  DCHECK(!pending_sample_request_tokens_.empty());
  ComPtr<IUnknown> request_token = pending_sample_request_tokens_.front();
  hr = ServiceSampleRequest(request_token.Get(),
                            post_flush_buffers_.front().get());
  if (FAILED(hr)) {
    DLOG(WARNING) << "Failed to service post flush sample: " << PrintHr(hr);
    return false;
  }

  post_flush_buffers_.pop();
  return true;
}

HRESULT MediaFoundationStreamWrapper::QueueFormatChangedEvent() {
  DVLOG_FUNC(2);

  ComPtr<IMFMediaType> media_type;
  RETURN_IF_FAILED(GetMediaType(&media_type));
  RETURN_IF_FAILED(mf_media_event_queue_->QueueEventParamUnk(
      MEStreamFormatChanged, GUID_NULL, S_OK, media_type.Get()));
  return S_OK;
}

void MediaFoundationStreamWrapper::OnDemuxerStreamRead(
    DemuxerStream::Status status,
    scoped_refptr<DecoderBuffer> buffer) {
  DVLOG_FUNC(3) << "status=" << status
                << (buffer ? " buffer=" + buffer->AsHumanReadableString(true)
                           : "");
  {
    lock_.AssertAcquired();
    ComPtr<IUnknown> token = pending_sample_request_tokens_.front();
    HRESULT hr = S_OK;

    if (status == DemuxerStream::Status::kOk) {
      if (!encryption_type_reported_) {
        encryption_type_reported_ = true;
        ReportEncryptionType(buffer);
      }

      if (has_clear_lead_ && !switched_clear_to_encrypted_ &&
          !buffer->end_of_stream() && buffer->is_encrypted()) {
        MEDIA_LOG(INFO, media_log_)
            << "Stream switched from clear to encrypted buffers.";
        switched_clear_to_encrypted_ = true;
      }

      // Push |buffer| to process later if needed. Otherwise, process it
      // immediately.
      if (flushed_ || !post_flush_buffers_.empty()) {
        DVLOG_FUNC(3) << "push buffer.";
        post_flush_buffers_.push(buffer);
      } else {
        hr = ServiceSampleRequest(token.Get(), buffer.get());
        if (FAILED(hr)) {
          DLOG(ERROR) << __func__
                      << ": ServiceSampleRequest failed: " << PrintHr(hr);
          return;
        }
      }
    } else if (status == DemuxerStream::Status::kConfigChanged) {
      DVLOG_FUNC(2) << "Stream config changed, AreFormatChangesEnabled="
                    << AreFormatChangesEnabled();
      if (AreFormatChangesEnabled()) {
        hr = QueueFormatChangedEvent();
        if (FAILED(hr)) {
          DLOG(ERROR) << __func__
                      << ": QueueFormatChangedEvent failed: " << PrintHr(hr);
          return;
        }
      } else {
        // GetMediaType() calls {audio,video}_decoder_config(), which is
        // required by DemuxerStream when kConfigChanged happens.
        ComPtr<IMFMediaType> media_type;
        hr = GetMediaType(&media_type);
        if (FAILED(hr)) {
          DLOG(ERROR) << __func__ << ": GetMediaType failed: " << PrintHr(hr);
          return;
        }
      }
    } else if (status == DemuxerStream::Status::kError) {
      DVLOG_FUNC(2) << "Stream read error";
      mf_media_event_queue_->QueueEventParamVar(
          MEError, GUID_NULL, MF_E_INVALID_STREAM_DATA, nullptr);
      return;
    } else if (status == DemuxerStream::Status::kAborted) {
      DVLOG_FUNC(2) << "Stream read aborted";
      // Continue to ProcessRequestsIfPossible() to satisfy pending sample
      // request by issuing DemuxerStream::Read() if necessary.
    } else {
      NOTREACHED() << "Unexpected demuxer stream status. status=" << status
                   << ", this=" << this;
    }
  }

  // ProcessRequestsIfPossible calls OnDemuxerStreamRead, OnDemuxerStreamRead
  // calls ProcessRequestsIfPossible, so use PostTask to avoid deadlock here.
  task_runner_->PostTask(
      FROM_HERE,
      base::BindOnce(&MediaFoundationStreamWrapper::ProcessRequestsIfPossible,
                     weak_factory_.GetWeakPtr()));
}

HRESULT MediaFoundationStreamWrapper::TransformSample(
    Microsoft::WRL::ComPtr<IMFSample>& sample) {
  DVLOG_FUNC(3);

  return S_OK;
}

HRESULT MediaFoundationStreamWrapper::GetMediaSource(
    IMFMediaSource** media_source_out) {
  DVLOG_FUNC(2);
  DCHECK(!task_runner_->RunsTasksInCurrentSequence());

  base::AutoLock auto_lock(lock_);
  if (!parent_source_) {
    DLOG(ERROR) << __func__ << ": MF_E_SHUTDOWN";
    return MF_E_SHUTDOWN;
  }
  RETURN_IF_FAILED(parent_source_.CopyTo(media_source_out));
  return S_OK;
}

HRESULT MediaFoundationStreamWrapper::GetStreamDescriptor(
    IMFStreamDescriptor** stream_descriptor_out) {
  DVLOG_FUNC(2);

  if (!mf_stream_descriptor_) {
    DLOG(ERROR) << __func__ << ": MF_E_NOT_INITIALIZED";
    return MF_E_NOT_INITIALIZED;
  }
  RETURN_IF_FAILED(mf_stream_descriptor_.CopyTo(stream_descriptor_out));
  return S_OK;
}

HRESULT MediaFoundationStreamWrapper::RequestSample(IUnknown* token) {
  DVLOG_FUNC(3);
  DCHECK(!task_runner_->RunsTasksInCurrentSequence());

  base::AutoLock auto_lock(lock_);
  // If token is nullptr, we still want to push it to represent a sample
  // request from MF.
  pending_sample_request_tokens_.push(token);

  task_runner_->PostTask(
      FROM_HERE,
      base::BindOnce(&MediaFoundationStreamWrapper::ProcessRequestsIfPossible,
                     weak_factory_.GetWeakPtr()));
  return S_OK;
}

HRESULT MediaFoundationStreamWrapper::GetEvent(DWORD flags,
                                               IMFMediaEvent** event_out) {
  DVLOG_FUNC(3);
  DCHECK(mf_media_event_queue_);

  // Not tracing hr to avoid the noise from MF_E_NO_EVENTS_AVAILABLE.
  return mf_media_event_queue_->GetEvent(flags, event_out);
}

HRESULT MediaFoundationStreamWrapper::BeginGetEvent(IMFAsyncCallback* callback,
                                                    IUnknown* state) {
  DVLOG_FUNC(3);
  DCHECK(mf_media_event_queue_);

  RETURN_IF_FAILED(mf_media_event_queue_->BeginGetEvent(callback, state));
  return S_OK;
}

HRESULT MediaFoundationStreamWrapper::EndGetEvent(IMFAsyncResult* result,
                                                  IMFMediaEvent** event_out) {
  DVLOG_FUNC(3);
  DCHECK(mf_media_event_queue_);

  RETURN_IF_FAILED(mf_media_event_queue_->EndGetEvent(result, event_out));
  return S_OK;
}

HRESULT MediaFoundationStreamWrapper::QueueEvent(MediaEventType type,
                                                 REFGUID extended_type,
                                                 HRESULT status,
                                                 const PROPVARIANT* value) {
  DVLOG_FUNC(3);
  DCHECK(mf_media_event_queue_);

  RETURN_IF_FAILED(mf_media_event_queue_->QueueEventParamVar(
      type, extended_type, status, value));
  return S_OK;
}

HRESULT MediaFoundationStreamWrapper::GenerateStreamDescriptor() {
  DVLOG_FUNC(2);

  ComPtr<IMFMediaType> media_type;
  IMFMediaType** mediaTypes = &media_type;

  RETURN_IF_FAILED(GetMediaType(&media_type));
  RETURN_IF_FAILED(MFCreateStreamDescriptor(stream_id_, 1, mediaTypes,
                                            &mf_stream_descriptor_));

  if (IsEncrypted()) {
    RETURN_IF_FAILED(mf_stream_descriptor_->SetUINT32(MF_SD_PROTECTED, 1));
  }

  return S_OK;
}

bool MediaFoundationStreamWrapper::AreFormatChangesEnabled() {
  return true;
}

GUID MediaFoundationStreamWrapper::GetLastKeyId() const {
  return last_key_id_;
}

void MediaFoundationStreamWrapper::ReportEncryptionType(
    const scoped_refptr<DecoderBuffer>& buffer) {
  auto encryption_type = EncryptionType::kClear;
  if (IsEncrypted()) {
    // Treat EOS as clear buffer which should be rare.
    bool is_buffer_encrypted =
        !buffer->end_of_stream() && buffer->decrypt_config();
    encryption_type = !is_buffer_encrypted
                          ? EncryptionType::kEncryptedWithClearLead
                          : EncryptionType::kEncrypted;
  }

  if (encryption_type == EncryptionType::kEncryptedWithClearLead) {
    MEDIA_LOG(INFO, media_log_) << "MediaFoundationStreamWrapper: "
                                << DemuxerStream::GetTypeName(stream_type_)
                                << " stream is encrypted with clear lead";
    has_clear_lead_ = true;
  }

  // TODO(xhwang): Report `encryption_type` to `PipelineStatistics` so it's
  // also reported to UKM.
}

}  // namespace media