chromium/media/fuchsia/common/vmo_buffer_writer_queue.cc

// Copyright 2021 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifdef UNSAFE_BUFFERS_BUILD
// TODO(crbug.com/40285824): Remove this and convert code to safer constructs.
#pragma allow_unsafe_buffers
#endif

#include "media/fuchsia/common/vmo_buffer_writer_queue.h"

#include <zircon/rights.h>
#include <algorithm>

#include "base/fuchsia/fuchsia_logging.h"
#include "base/process/process_metrics.h"
#include "media/base/decoder_buffer.h"

namespace media {

VmoBufferWriterQueue::PendingBuffer::PendingBuffer(
    scoped_refptr<DecoderBuffer> buffer)
    : buffer(buffer) {
  DCHECK(buffer);
}

VmoBufferWriterQueue::PendingBuffer::~PendingBuffer() = default;

VmoBufferWriterQueue::PendingBuffer::PendingBuffer(PendingBuffer&& other) =
    default;

const uint8_t* VmoBufferWriterQueue::PendingBuffer::data() const {
  return buffer->data() + buffer_pos;
}

size_t VmoBufferWriterQueue::PendingBuffer::bytes_left() const {
  return buffer->size() - buffer_pos;
}

void VmoBufferWriterQueue::PendingBuffer::AdvanceCurrentPos(size_t bytes) {
  DCHECK_LE(bytes, bytes_left());
  buffer_pos += bytes;
}

VmoBufferWriterQueue::VmoBufferWriterQueue() {
  DETACH_FROM_THREAD(thread_checker_);
}

VmoBufferWriterQueue::~VmoBufferWriterQueue() = default;

void VmoBufferWriterQueue::EnqueueBuffer(scoped_refptr<DecoderBuffer> buffer) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  pending_buffers_.emplace_back(std::move(buffer));
  PumpPackets();
}

void VmoBufferWriterQueue::Start(std::vector<VmoBuffer> buffers,
                                 SendPacketCB send_packet_cb,
                                 EndOfStreamCB end_of_stream_cb) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  DCHECK(buffers_.empty());
  DCHECK(!buffers.empty());

  buffers_ = std::move(buffers);
  send_packet_cb_ = std::move(send_packet_cb);
  end_of_stream_cb_ = std::move(end_of_stream_cb);

  // Initialize |unused_buffers_|.
  unused_buffers_.reserve(buffers_.size());
  for (size_t i = 0; i < buffers_.size(); ++i) {
    unused_buffers_.push_back(i);
  }

  PumpPackets();
}

bool VmoBufferWriterQueue::IsBlocked() const {
  return unused_buffers_.empty();
}

void VmoBufferWriterQueue::PumpPackets() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  auto weak_this = weak_factory_.GetWeakPtr();

  while (!buffers_.empty() && !is_paused_ &&
         input_queue_position_ < pending_buffers_.size()) {
    PendingBuffer* current_buffer = &pending_buffers_[input_queue_position_];

    if (current_buffer->buffer->end_of_stream()) {
      // Pop the EndOfStream buffer if it's the only buffer. Otherwise, mark it
      // as complete so that ReleaseBuffer will pop it.
      if (input_queue_position_ == 0) {
        pending_buffers_.pop_front();
        DCHECK(pending_buffers_.empty());
      } else {
        current_buffer->is_complete = true;
        input_queue_position_ += 1;
      }
      end_of_stream_cb_.Run();
      if (!weak_this)
        return;
      continue;
    }

    if (unused_buffers_.empty()) {
      // No input buffer available.
      return;
    }

    size_t buffer_index = unused_buffers_.back();
    unused_buffers_.pop_back();

    size_t bytes_filled = buffers_[buffer_index].Write(
        base::make_span(current_buffer->data(), current_buffer->bytes_left()));
    current_buffer->AdvanceCurrentPos(bytes_filled);

    bool buffer_end = current_buffer->bytes_left() == 0;

    StreamProcessorHelper::IoPacket packet(
        buffer_index, /*offset=*/0, bytes_filled,
        current_buffer->buffer->timestamp(), buffer_end, /*key_frame=*/false,
        base::BindOnce(&VmoBufferWriterQueue::ReleaseBuffer,
                       weak_factory_.GetWeakPtr(), buffer_index));

    if (buffer_end) {
      current_buffer->tail_sysmem_buffer_index = buffer_index;
      input_queue_position_ += 1;
    }

    send_packet_cb_.Run(current_buffer->buffer.get(), std::move(packet));
    if (!weak_this)
      return;
  }
}

void VmoBufferWriterQueue::ResetQueue() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  pending_buffers_.clear();
  input_queue_position_ = 0;
  is_paused_ = false;
}

void VmoBufferWriterQueue::ResetBuffers() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  buffers_.clear();
  send_packet_cb_ = SendPacketCB();
  end_of_stream_cb_ = EndOfStreamCB();

  // Invalidate weak pointers, so ReleaseBuffer() is not called for the old
  // buffers.
  weak_factory_.InvalidateWeakPtrs();
}

void VmoBufferWriterQueue::ResetPositionAndPause() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  for (auto& buffer : pending_buffers_) {
    buffer.buffer_pos = 0;
    buffer.is_complete = false;

    // All packets that were pending will need to be resent. Reset
    // |tail_sysmem_buffer_index| to ensure that these packets are not removed
    // from the queue in ReleaseBuffer().
    buffer.tail_sysmem_buffer_index = std::nullopt;
  }
  input_queue_position_ = 0;
  is_paused_ = true;
}

void VmoBufferWriterQueue::Unpause() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  DCHECK(is_paused_);
  is_paused_ = false;
  PumpPackets();
}

void VmoBufferWriterQueue::ReleaseBuffer(size_t buffer_index) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  DCHECK(!buffers_.empty());

  // Mark the input buffer as complete.
  for (size_t i = 0; i < input_queue_position_; ++i) {
    if (pending_buffers_[i].tail_sysmem_buffer_index == buffer_index)
      pending_buffers_[i].is_complete = true;
  }

  // Remove all complete buffers from the head of the queue since we no longer
  // need them. Note that currently StreamProcessor doesn't guarantee that input
  // buffers are released in the same order they were sent (see
  // https://fuchsia.googlesource.com/fuchsia/+/3b12c8c5/sdk/fidl/fuchsia.media/stream_processor.fidl#1646
  // ). This means that some complete buffers will need to stay in the queue
  // until all preceding packets are released as well.
  while (!pending_buffers_.empty() && pending_buffers_.front().is_complete) {
    pending_buffers_.pop_front();
    DCHECK_GT(input_queue_position_, 0U);
    input_queue_position_--;
  }

  unused_buffers_.push_back(buffer_index);
  PumpPackets();
}

size_t VmoBufferWriterQueue::num_buffers() const {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  return buffers_.size();
}

}  // namespace media