chromium/third_party/mediapipe/src/mediapipe/calculators/tensor/audio_to_tensor_calculator.cc

// Copyright 2022 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>
#include <cmath>
#include <cstring>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "absl/log/absl_check.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_format.h"
#include "audio/dsp/resampler_q.h"
#include "audio/dsp/window_functions.h"
#include "mediapipe/calculators/tensor/audio_to_tensor_calculator.pb.h"
#include "mediapipe/framework/api2/node.h"
#include "mediapipe/framework/api2/packet.h"
#include "mediapipe/framework/api2/port.h"
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/formats/matrix.h"
#include "mediapipe/framework/formats/tensor.h"
#include "mediapipe/framework/formats/time_series_header.pb.h"
#include "mediapipe/framework/memory_manager.h"
#include "mediapipe/framework/memory_manager_service.h"
#include "mediapipe/framework/port/ret_check.h"
#include "mediapipe/util/time_series_util.h"
#include "pffft.h"

namespace mediapipe {
namespace api2 {
namespace {

using Options = ::mediapipe::AudioToTensorCalculatorOptions;
using DftTensorFormat = Options::DftTensorFormat;
using FlushMode = Options::FlushMode;

std::vector<float> HannWindow(int window_size, bool sqrt_hann) {
  std::vector<float> hann_window(window_size);
  audio_dsp::HannWindow().GetPeriodicSamples(window_size, &hann_window);
  if (sqrt_hann) {
    absl::c_transform(hann_window, hann_window.begin(),
                      [](double x) { return std::sqrt(x); });
  }
  return hann_window;
}

// PFFFT only supports transforms for inputs of length N of the form
// N = (2^a)*(3^b)*(5^c) where b >=0 and c >= 0 and a >= 5 for the real FFT.
bool IsValidFftSize(int size) {
  if (size <= 0) {
    return false;
  }
  constexpr int kFactors[] = {2, 3, 5};
  int factorization[] = {0, 0, 0};
  int n = static_cast<int>(size);
  for (int i = 0; i < 3; ++i) {
    while (n % kFactors[i] == 0) {
      n = n / kFactors[i];
      ++factorization[i];
    }
  }
  return factorization[0] >= 5 && n == 1;
}

}  // namespace

// Converts audio buffers into tensors, possibly with resampling, buffering
// and framing, according to specified inputs and options. All input audio
// buffers will be first resampled from the input sample rate to the target
// sample rate if they are not equal. The resampled audio data (with the
// buffered samples from the previous runs in the streaming mode) will be broken
// into fixed-sized, possibly overlapping frames. If the calculator is not asked
// to perform fft (the fft_size is not set in the calculator options), all
// frames will be converted to and outputted as MediaPipe Tensors. The last
// output tensor will be zero-padding if the remaining samples are insufficient.
// Otherwise, when the fft_size is set and valid, the calculator will perform
// fft on the fixed-sized audio frames, the complex DFT results will be
// converted to and outputted as 2D MediaPipe float Tensors where the first
// rows are the DFT real parts and the second rows are the DFT imagery parts.
//
// This calculator assumes that the input timestamps refer to the first
// sample in each Matrix. The output timestamps follow this same convention.
// One Process() call may output multiple tensors packets. The timestamps of
// the output packets are determined by the timestamp of the previous output
// packet, the target sample rate, and the number of samples advanced after the
// previous output.
//
// The calculator has two running modes:
//   Streaming mode: when "stream_mode" is set to true in the calculator
//     options, the calculator treats the input audio stream as a continuous
//     stream. Thus, any samples that are not consumed in the previous runs will
//     be cached in a global sample buffer. The audio data resampled from the
//     current raw audio input will be appended to the global sample buffer.
//     The calculator will process the global sample buffer and output as many
//     tensors as possible.
//   Non-streaming mode: when "stream_mode" is set to false in the calculator
//     options, the calculators treats the packets in the input audio stream as
//     a batch of unrelated audio buffers. In each Process() call, the input
//     buffer will be first resampled, and framed as fixed-sized, possibly
//     overlapping tensors. The last tensor produced by a Process() invocation
//     will be zero-padding if the remaining samples are insufficient. As the
//     calculator treats the input packets as unrelated, all samples will be
//     processed immediately and no samples will be cached in the global sample
//     buffer.
//
// Inputs:
//   AUDIO - mediapipe::Matrix
//     The audio data represented as mediapipe::Matrix.
//   SAMPLE_RATE - double @Optional
//     The sample rate of the corresponding audio data in the "AUDIO" stream.
//     If a sample rate packet is provided at Timestamp::PreStream(), the sample
//     rate will be used as the sample rate of every audio packets in the
//     "AUDIO" stream. Note that one and only one of the "AUDIO" stream's time
//     series header or the "SAMPLE_RATE" stream can exist.
//
// Outputs:
//   TENSORS - std::vector<Tensor>
//     Vector containing a single Tensor that represents a fix-sized audio
//     frame or the complex DFT results.
//   TIMESTAMPS - std::vector<Timestamp> @Optional
//     Vector containing the output timestamps emitted by the current Process()
//     invocation. In the non-streaming mode, the vector contains all of the
//     output timestamps for an input audio buffer.
//   DC_AND_NYQUIST - std::pair<float, float> @Optional.
//     A pair of dc component and nyquist component. Only can be connected when
//     the calculator performs fft (the fft_size is set in the calculator
//     options).
//
// Example:
// node {
//   calculator: "AudioToTensorCalculator"
//   input_stream: "AUDIO:audio"
//   output_stream: "TENSORS:tensors"
//   output_stream: "TIMESTAMPS:timestamps"
//   options {
//     [mediapipe.AudioToTensorCalculatorOptions.ext] {
//       num_channels: 2
//       num_samples: 512
//       num_overlapping_samples: 64
//       target_sample_rate: 16000
//       stream_mode: true # or false
//     }
//   }
// }
class AudioToTensorCalculator : public Node {
 public:
  static constexpr Input<Matrix> kAudioIn{"AUDIO"};
  // TODO: Removes this optional input stream when the "AUDIO" stream
  // uses the new mediapipe audio data containers that carry audio metadata,
  // such as sample rate.
  static constexpr Input<double>::Optional kAudioSampleRateIn{"SAMPLE_RATE"};
  static constexpr Output<std::vector<Tensor>> kTensorsOut{"TENSORS"};
  static constexpr Output<std::pair<float, float>>::Optional kDcAndNyquistOut{
      "DC_AND_NYQUIST"};
  // A vector of the output timestamps emitted by the current Process()
  // invocation. The packet timestamp is the last emitted timestamp.
  static constexpr Output<std::vector<Timestamp>>::Optional kTimestampsOut{
      "TIMESTAMPS"};
  MEDIAPIPE_NODE_CONTRACT(kAudioIn, kAudioSampleRateIn, kTensorsOut,
                          kDcAndNyquistOut, kTimestampsOut);

  static absl::Status UpdateContract(CalculatorContract* cc);
  absl::Status Open(CalculatorContext* cc);
  absl::Status Process(CalculatorContext* cc);
  absl::Status Close(CalculatorContext* cc);

 private:
  // The target number of channels.
  int num_channels_;
  // The target number of samples per channel.
  int num_samples_;
  // The number of samples per channel to advance after the current frame is
  // processed.
  int frame_step_;
  bool stream_mode_;
  bool check_inconsistent_timestamps_;
  int padding_samples_before_;
  int padding_samples_after_;
  FlushMode flush_mode_;
  DftTensorFormat dft_tensor_format_;

  Timestamp initial_timestamp_ = Timestamp::Unstarted();
  int64_t cumulative_input_samples_ = 0;
  Timestamp next_output_timestamp_ = Timestamp::Unstarted();

  double source_sample_rate_ = -1;
  double target_sample_rate_ = -1;
  // TODO: Configures QResamplerParams through calculator options.
  audio_dsp::QResamplerParams params_;
  // A QResampler instance to resample an audio stream.
  std::unique_ptr<audio_dsp::QResampler<float>> resampler_;
  Matrix sample_buffer_;
  int processed_buffer_cols_ = 0;
  double gain_ = 1.0;

  // Enable pooling of AHWBs in Tensor instances.
  MemoryManager* memory_manager_ = nullptr;

  // The internal state of the FFT library.
  PFFFT_Setup* fft_state_ = nullptr;
  int fft_size_ = 0;
  std::vector<float> fft_window_;
  std::vector<float, Eigen::aligned_allocator<float>> fft_input_buffer_;
  // pffft requires memory to work with to avoid using the stack.
  std::vector<float, Eigen::aligned_allocator<float>> fft_workplace_;
  std::vector<float, Eigen::aligned_allocator<float>> fft_output_;

  absl::Status ProcessStreamingData(CalculatorContext* cc, const Matrix& input);
  absl::Status ProcessNonStreamingData(CalculatorContext* cc,
                                       const Matrix& input);

  absl::Status SetupStreamingResampler(double input_sample_rate_);
  void AppendToSampleBuffer(Matrix buffer_to_append);
  void AppendZerosToSampleBuffer(int num_samples);

  absl::StatusOr<std::vector<Tensor>> ConvertToTensor(
      const Matrix& block, std::vector<int> tensor_dims);
  absl::Status OutputTensor(const Matrix& block, Timestamp timestamp,
                            CalculatorContext* cc);
  absl::Status ProcessBuffer(const Matrix& buffer, bool should_flush,
                             CalculatorContext* cc);
};

absl::Status AudioToTensorCalculator::UpdateContract(CalculatorContract* cc) {
  const auto& options = cc->Options<Options>();
  if (!options.has_num_channels() || !options.has_num_samples() ||
      !options.has_target_sample_rate()) {
    return absl::InvalidArgumentError(
        "AudioToTensorCalculatorOptions must specify "
        "`num_channels`, `num_samples`, and `target_sample_rate`.");
  }
  if (options.stream_mode()) {
    // Explicitly disables timestamp offset to disallow the timestamp bound
    // from the input streams to be propagated to the output streams.
    // In the streaming mode, the output timestamp bound is based on
    // next_output_timestamp_, which can be smaller than the current input
    // timestamps.
    cc->SetTimestampOffset(TimestampDiff::Unset());
  }
  if (options.padding_samples_before() < 0 ||
      options.padding_samples_after() < 0) {
    return absl::InvalidArgumentError("Negative zero padding unsupported");
  }
  if (options.flush_mode() != Options::ENTIRE_TAIL_AT_TIMESTAMP_MAX &&
      options.flush_mode() != Options::PROCEED_AS_USUAL) {
    return absl::InvalidArgumentError("Unsupported flush mode");
  }
  cc->UseService(kMemoryManagerService).Optional();
  return absl::OkStatus();
}

absl::Status AudioToTensorCalculator::Open(CalculatorContext* cc) {
  if (cc->Service(kMemoryManagerService).IsAvailable()) {
    memory_manager_ = &cc->Service(kMemoryManagerService).GetObject();
  }
  const auto& options =
      cc->Options<mediapipe::AudioToTensorCalculatorOptions>();
  num_channels_ = options.num_channels();
  num_samples_ = options.num_samples();
  if (options.has_num_overlapping_samples()) {
    RET_CHECK_GE(options.num_overlapping_samples(), 0);
    RET_CHECK_LT(options.num_overlapping_samples(), num_samples_);
    frame_step_ = num_samples_ - options.num_overlapping_samples();
  } else {
    frame_step_ = num_samples_;
  }
  target_sample_rate_ = options.target_sample_rate();
  stream_mode_ = options.stream_mode();
  if (stream_mode_) {
    check_inconsistent_timestamps_ = options.check_inconsistent_timestamps();
    sample_buffer_.resize(num_channels_, Eigen::NoChange);
  }
  padding_samples_before_ = options.padding_samples_before();
  padding_samples_after_ = options.padding_samples_after();
  dft_tensor_format_ = options.dft_tensor_format();
  flush_mode_ = options.flush_mode();
  if (options.has_volume_gain_db()) {
    gain_ = pow(10, options.volume_gain_db() / 20.0);
  }
  if (options.has_source_sample_rate()) {
    source_sample_rate_ = options.source_sample_rate();
  } else {
    RET_CHECK(kAudioSampleRateIn(cc).IsConnected() ^
              !kAudioIn(cc).Header().IsEmpty())
        << "Must either specify the time series header of the \"AUDIO\" stream "
           "or have the \"SAMPLE_RATE\" stream connected.";
    if (!kAudioIn(cc).Header().IsEmpty()) {
      mediapipe::TimeSeriesHeader input_header;
      MP_RETURN_IF_ERROR(
          mediapipe::time_series_util::FillTimeSeriesHeaderIfValid(
              kAudioIn(cc).Header(), &input_header));
      if (stream_mode_) {
        MP_RETURN_IF_ERROR(SetupStreamingResampler(input_header.sample_rate()));
      } else {
        source_sample_rate_ = input_header.sample_rate();
      }
    }
  }
  AppendZerosToSampleBuffer(padding_samples_before_);
  if (options.has_fft_size()) {
    RET_CHECK(IsValidFftSize(options.fft_size()))
        << "FFT size must be of the form fft_size = (2^a)*(3^b)*(5^c) where b "
           ">=0 and c >= 0 and a >= 5, the requested fft size is "
        << options.fft_size();
    RET_CHECK_EQ(1, num_channels_)
        << "Currently only support applying FFT on mono channel.";
    fft_size_ = options.fft_size();
    fft_state_ = pffft_new_setup(fft_size_, PFFFT_REAL);
    fft_window_ = HannWindow(fft_size_, /* sqrt_hann = */ false);
    fft_input_buffer_.resize(fft_size_);
    fft_workplace_.resize(fft_size_);
    fft_output_.resize(fft_size_);
  } else {
    RET_CHECK(!kDcAndNyquistOut(cc).IsConnected())
        << "The DC_AND_NYQUIST output stream can only be connected when the "
           "calculator outputs fft tensors";
  }
  return absl::OkStatus();
}

absl::Status AudioToTensorCalculator::Process(CalculatorContext* cc) {
  if (cc->InputTimestamp() == Timestamp::PreStream()) {
    double current_source_sample_rate = kAudioSampleRateIn(cc).Get();
    if (cc->Options<mediapipe::AudioToTensorCalculatorOptions>()
            .stream_mode()) {
      return SetupStreamingResampler(current_source_sample_rate);
    } else {
      source_sample_rate_ = current_source_sample_rate;
      return absl::OkStatus();
    }
  }
  // Sanity checks.
  const auto& input_frame = kAudioIn(cc).Get();
  const bool channels_match = input_frame.rows() == num_channels_;
  // The special case of `num_channels_ == 1` is automatic mixdown to mono.
  const bool mono_output = num_channels_ == 1;
  if (!mono_output && !channels_match) {
    return absl::InvalidArgumentError(absl::StrFormat(
        "Audio input has %d channel(s) but the model requires %d channel(s).",
        input_frame.rows(), num_channels_));
  }
  if (!mono_output && input_frame.IsRowMajor) {
    return absl::InvalidArgumentError(
        "The audio data should be stored in column-major.");
  }
  ABSL_CHECK(channels_match || mono_output);
  const Matrix& input = channels_match ? input_frame
                                       // Mono mixdown.
                                       : input_frame.colwise().mean();
  if (gain_ != 1.0) {
    return stream_mode_ ? ProcessStreamingData(cc, input * gain_)
                        : ProcessNonStreamingData(cc, input * gain_);
  }
  return stream_mode_ ? ProcessStreamingData(cc, input)
                      : ProcessNonStreamingData(cc, input);
}

absl::Status AudioToTensorCalculator::Close(CalculatorContext* cc) {
  if (!stream_mode_) {
    return absl::OkStatus();
  }
  if (resampler_) {
    Matrix resampled_buffer(num_channels_, 0);
    resampler_->Flush(&resampled_buffer);
    AppendToSampleBuffer(std::move(resampled_buffer));
  }
  AppendZerosToSampleBuffer(padding_samples_after_);
  MP_RETURN_IF_ERROR(ProcessBuffer(sample_buffer_, /*should_flush=*/true, cc));
  if (fft_state_) {
    pffft_destroy_setup(fft_state_);
  }
  return absl::OkStatus();
}

absl::Status AudioToTensorCalculator::ProcessStreamingData(
    CalculatorContext* cc, const Matrix& input) {
  const auto& input_buffer = input;
  if (initial_timestamp_ == Timestamp::Unstarted()) {
    initial_timestamp_ = cc->InputTimestamp();
    next_output_timestamp_ = initial_timestamp_;
  }
  if (source_sample_rate_ != -1 && check_inconsistent_timestamps_) {
    mediapipe::time_series_util::LogWarningIfTimestampIsInconsistent(
        cc->InputTimestamp(), initial_timestamp_, cumulative_input_samples_,
        source_sample_rate_);
    cumulative_input_samples_ += input_buffer.cols();
  }
  if (!kAudioSampleRateIn(cc).IsEmpty()) {
    double current_source_sample_rate = kAudioSampleRateIn(cc).Get();
    if (resampler_) {
      RET_CHECK_EQ(current_source_sample_rate, source_sample_rate_);
    } else {
      MP_RETURN_IF_ERROR(SetupStreamingResampler(current_source_sample_rate));
    }
  }

  if (resampler_) {
    Matrix resampled_buffer(num_channels_, 0);
    resampler_->ProcessSamples(input_buffer, &resampled_buffer);
    AppendToSampleBuffer(std::move(resampled_buffer));
  } else {
    // Tries to consume the input matrix first to avoid extra data copy.
    auto status_or_matrix = kAudioIn(cc).packet().Consume<Matrix>();
    if (status_or_matrix.ok()) {
      Matrix local_matrix(num_channels_, 0);
      local_matrix.swap(*status_or_matrix.value());
      AppendToSampleBuffer(std::move(local_matrix));
    } else {
      AppendToSampleBuffer(input_buffer);
    }
  }

  MP_RETURN_IF_ERROR(ProcessBuffer(sample_buffer_, /*should_flush=*/false, cc));
  // Removes the processed samples from the global sample buffer.
  sample_buffer_ = Matrix(sample_buffer_.rightCols(sample_buffer_.cols() -
                                                   processed_buffer_cols_ - 1));
  return absl::OkStatus();
}

absl::Status AudioToTensorCalculator::ProcessNonStreamingData(
    CalculatorContext* cc, const Matrix& input) {
  initial_timestamp_ = cc->InputTimestamp();
  next_output_timestamp_ = initial_timestamp_;
  const auto& input_frame = input;
  double source_sample_rate = kAudioSampleRateIn(cc).GetOr(source_sample_rate_);

  if (source_sample_rate != -1 && source_sample_rate != target_sample_rate_) {
    std::vector<float> resampled = audio_dsp::QResampleSignal<float>(
        source_sample_rate, target_sample_rate_, num_channels_, params_,
        input_frame);
    Eigen::Map<const Matrix> matrix_mapping(resampled.data(), num_channels_,
                                            resampled.size() / num_channels_);
    return ProcessBuffer(matrix_mapping, /*should_flush=*/true, cc);
  }
  return ProcessBuffer(input_frame, /*should_flush=*/true, cc);
}

absl::Status AudioToTensorCalculator::SetupStreamingResampler(
    double input_sample_rate) {
  if (input_sample_rate == source_sample_rate_) {
    return absl::OkStatus();
  }
  source_sample_rate_ = input_sample_rate;
  if (source_sample_rate_ != target_sample_rate_) {
    resampler_ = absl::make_unique<audio_dsp::QResampler<float>>(
        source_sample_rate_, target_sample_rate_, num_channels_, params_);
    if (!resampler_) {
      return absl::InternalError("Failed to initialize resampler.");
    }
  }
  return absl::OkStatus();
}

void AudioToTensorCalculator::AppendZerosToSampleBuffer(int num_samples) {
  ABSL_CHECK_GE(num_samples, 0);  // Ensured by `UpdateContract`.
  if (num_samples == 0) {
    return;
  }
  sample_buffer_.conservativeResize(Eigen::NoChange,
                                    sample_buffer_.cols() + num_samples);
  sample_buffer_.rightCols(num_samples).setZero();
}

void AudioToTensorCalculator::AppendToSampleBuffer(Matrix buffer_to_append) {
  sample_buffer_.conservativeResize(
      Eigen::NoChange, sample_buffer_.cols() + buffer_to_append.cols());
  sample_buffer_.rightCols(buffer_to_append.cols()).swap(buffer_to_append);
}

absl::StatusOr<std::vector<Tensor>> AudioToTensorCalculator::ConvertToTensor(
    const Matrix& block, std::vector<int> tensor_dims) {
  Tensor tensor(Tensor::ElementType::kFloat32, Tensor::Shape(tensor_dims),
                memory_manager_);
  auto buffer_view = tensor.GetCpuWriteView();
  int total_size = 1;
  for (int dim : tensor_dims) {
    total_size *= dim;
  }
  if (block.size() < total_size) {
    std::memset(buffer_view.buffer<float>(), 0, tensor.bytes());
  }
  std::memcpy(buffer_view.buffer<float>(), block.data(),
              block.size() * sizeof(float));
  std::vector<Tensor> tensor_vector;
  tensor_vector.push_back(std::move(tensor));
  return tensor_vector;
}

absl::Status AudioToTensorCalculator::OutputTensor(const Matrix& block,
                                                   Timestamp timestamp,
                                                   CalculatorContext* cc) {
  std::vector<Tensor> output_tensor;
  if (fft_state_) {
    Eigen::VectorXf time_series_data =
        Eigen::VectorXf::Map(block.data(), block.size());
    //  Window on input audio prior to FFT.
    std::transform(time_series_data.begin(), time_series_data.end(),
                   fft_window_.begin(), fft_input_buffer_.begin(),
                   std::multiplies<float>());
    pffft_transform_ordered(fft_state_, fft_input_buffer_.data(),
                            fft_output_.data(), fft_workplace_.data(),
                            PFFFT_FORWARD);
    if (kDcAndNyquistOut(cc).IsConnected()) {
      kDcAndNyquistOut(cc).Send(std::make_pair(fft_output_[0], fft_output_[1]),
                                timestamp);
    }
    switch (dft_tensor_format_) {
      case Options::WITH_NYQUIST: {
        Matrix fft_output_matrix =
            Eigen::Map<const Matrix>(fft_output_.data() + 2, 1, fft_size_ - 2);
        fft_output_matrix.conservativeResize(Eigen::NoChange, fft_size_);
        // The last two elements are Nyquist component.
        fft_output_matrix(fft_size_ - 2) = fft_output_[1];  // Nyquist real part
        fft_output_matrix(fft_size_ - 1) = 0.0f;  // Nyquist imagery part
        MP_ASSIGN_OR_RETURN(output_tensor, ConvertToTensor(fft_output_matrix,
                                                           {2, fft_size_ / 2}));
        break;
      }
      case Options::WITH_DC_AND_NYQUIST: {
        Matrix fft_output_matrix =
            Eigen::Map<const Matrix>(fft_output_.data(), 1, fft_size_);
        fft_output_matrix.conservativeResize(Eigen::NoChange, fft_size_ + 2);
        fft_output_matrix(1) = 0.0f;  // DC imagery part.
        // The last two elements are  Nyquist component.
        fft_output_matrix(fft_size_) = fft_output_[1];  // Nyquist real part
        fft_output_matrix(fft_size_ + 1) = 0.0f;        // Nyquist imagery part
        MP_ASSIGN_OR_RETURN(
            output_tensor,
            ConvertToTensor(fft_output_matrix, {2, (fft_size_ + 2) / 2}));
        break;
      }
      case Options::WITHOUT_DC_AND_NYQUIST: {
        Matrix fft_output_matrix =
            Eigen::Map<const Matrix>(fft_output_.data() + 2, 1, fft_size_ - 2);
        MP_ASSIGN_OR_RETURN(
            output_tensor,
            ConvertToTensor(fft_output_matrix, {2, (fft_size_ - 2) / 2}));
        break;
      }
      default:
        return absl::InvalidArgumentError("Unsupported dft tensor format.");
    }

  } else {
    MP_ASSIGN_OR_RETURN(output_tensor,
                        ConvertToTensor(block, {num_channels_, num_samples_}));
  }
  kTensorsOut(cc).Send(std::move(output_tensor), timestamp);
  return absl::OkStatus();
}

absl::Status AudioToTensorCalculator::ProcessBuffer(const Matrix& buffer,
                                                    bool should_flush,
                                                    CalculatorContext* cc) {
  const bool should_flush_at_timestamp_max =
      stream_mode_ && should_flush &&
      flush_mode_ == Options::ENTIRE_TAIL_AT_TIMESTAMP_MAX;
  int next_frame_first_col = 0;
  std::vector<Timestamp> timestamps;
  if (!should_flush_at_timestamp_max) {
    while (next_frame_first_col + num_samples_ <= buffer.cols()) {
      MP_RETURN_IF_ERROR(OutputTensor(
          buffer.block(0, next_frame_first_col, num_channels_, num_samples_),
          next_output_timestamp_, cc));
      timestamps.push_back(next_output_timestamp_);
      next_output_timestamp_ += round(frame_step_ / target_sample_rate_ *
                                      Timestamp::kTimestampUnitsPerSecond);
      next_frame_first_col += frame_step_;
    }
  }
  if (should_flush && next_frame_first_col < buffer.cols()) {
    // In the streaming mode, the flush happens in Close() and a packet at
    // Timestamp::Max() will be emitted. In the non-streaming mode, each
    // Process() invocation will process the entire buffer completely.
    Timestamp timestamp = should_flush_at_timestamp_max
                              ? Timestamp::Max()
                              : next_output_timestamp_;
    MP_RETURN_IF_ERROR(OutputTensor(
        buffer.block(
            0, next_frame_first_col, num_channels_,
            std::min(num_samples_, (int)buffer.cols() - next_frame_first_col)),
        timestamp, cc));
    timestamps.push_back(timestamp);
  }
  if (kTimestampsOut(cc).IsConnected()) {
    Timestamp timestamp = timestamps.back();
    kTimestampsOut(cc).Send(std::move(timestamps), timestamp);
  }
  processed_buffer_cols_ = next_frame_first_col - 1;
  return absl::OkStatus();
}

MEDIAPIPE_REGISTER_NODE(AudioToTensorCalculator);

}  // namespace api2
}  // namespace mediapipe