chromium/third_party/mediapipe/src/mediapipe/calculators/tensorflow/lapped_tensor_buffer_calculator.cc

// Copyright 2018 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <vector>

#include "absl/memory/memory.h"
#include "absl/types/span.h"
#include "mediapipe/calculators/tensorflow/lapped_tensor_buffer_calculator.pb.h"
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/port/ret_check.h"
#include "mediapipe/framework/port/status.h"
#include "mediapipe/framework/profiler/circular_buffer.h"
#include "tensorflow/core/framework/tensor.h"
#include "tensorflow/core/framework/tensor_shape.h"
#include "tensorflow/core/framework/tensor_util.h"
#include "tensorflow/core/framework/types.h"
#include "tensorflow/core/lib/core/status.h"

namespace mediapipe {

const char kBufferSize[] = "BUFFER_SIZE";
const char kOverlap[] = "OVERLAP";
const char kTimestampOffset[] = "TIMESTAMP_OFFSET";
const char kCalculatorOptions[] = "CALCULATOR_OPTIONS";

namespace tf = tensorflow;

// Given an input stream of tensors, concatenates the tensors over timesteps.
// The concatenated output tensors can be specified to have overlap between
// output timesteps. The tensors are concatenated along the first dimension, and
// a flag controls whether a new first dimension is inserted before
// concatenation.
//
// The number of tensors output will be buffer_size less than the
// number of input tensors unless padding is set to a non-zero value in the
// options proto.
//
// The timestamp of the output batch will match the timestamp of the first
// tensor in that batch by default. (e.g. when buffer_size frames are added, the
// output tensor will have the timestamp of the first input.). This behavior can
// be adjusted by the timestamp_offset option.
//
// Example config without padding:
// node {
//   calculator: "LappedTensorBufferCalculator"
//   input_stream: "input_tensor"
//   output_stream: "output_tensor"
//   options {
//     [mediapipe.LappedTensorBufferCalculatorOptions.ext] {
//       buffer_size: 2
//       overlap: 1
//       add_batch_dim_to_tensors: false
//     }
//   }
// }
//
// Example config with padding and timestamp output:
// node {
//   calculator: "LappedTensorBufferCalculator"
//   input_stream: "input_tensor"
//   output_stream: "output_tensor"
//   output_stream: "output_timestamp"
//   options {
//     [mediapipe.LappedTensorBufferCalculatorOptions.ext] {
//       buffer_size: 100
//       overlap: 50
//       add_batch_dim_to_tensors: true
//       timestamp_offset: 25
//       padding: 25
//     }
//   }
// }

class LappedTensorBufferCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc);

  absl::Status Open(CalculatorContext* cc) override;
  absl::Status Process(CalculatorContext* cc) override;
  absl::Status Close(CalculatorContext* cc) override;

 private:
  // Adds a batch dimension to the input tensor if specified in the
  // calculator options.
  absl::Status AddBatchDimension(tf::Tensor* input_tensor);
  // Sends the current buffer downstream.
  absl::Status ProcessBuffer(CalculatorContext* cc);

  int steps_until_output_;
  int buffer_size_;
  int overlap_;
  int timestamp_offset_;
  int initialized_;

  std::unique_ptr<CircularBuffer<Timestamp>> timestamp_buffer_;
  std::unique_ptr<CircularBuffer<tf::Tensor>> buffer_;
  LappedTensorBufferCalculatorOptions options_;
};

REGISTER_CALCULATOR(LappedTensorBufferCalculator);

absl::Status LappedTensorBufferCalculator::GetContract(CalculatorContract* cc) {
  RET_CHECK_EQ(cc->Inputs().NumEntries(), 1)
      << "Only one input stream is supported.";
  cc->Inputs().Index(0).Set<tf::Tensor>(
      // tensorflow::Tensor stream.
  );
  RET_CHECK_LE(cc->Outputs().NumEntries(), 2)
      << "Only one or two output stream(s) is/are supported.";

  if (cc->InputSidePackets().HasTag(kBufferSize)) {
    cc->InputSidePackets().Tag(kBufferSize).Set<int>();
  }
  if (cc->InputSidePackets().HasTag(kOverlap)) {
    cc->InputSidePackets().Tag(kOverlap).Set<int>();
  }
  if (cc->InputSidePackets().HasTag(kTimestampOffset)) {
    cc->InputSidePackets().Tag(kTimestampOffset).Set<int>();
  }
  if (cc->InputSidePackets().HasTag(kCalculatorOptions)) {
    cc->InputSidePackets()
        .Tag(kCalculatorOptions)
        .Set<LappedTensorBufferCalculatorOptions>();
  }
  cc->Outputs().Index(0).Set<tf::Tensor>(
      // Output tensorflow::Tensor stream with possibly overlapping steps.
  );
  // Output timestamp stream with possibly overlapping steps.
  if (cc->Outputs().NumEntries() > 1) {
    cc->Outputs().Index(1).Set<std::vector<Timestamp>>();
  }
  return absl::OkStatus();
}

absl::Status LappedTensorBufferCalculator::Open(CalculatorContext* cc) {
  options_ = cc->Options<LappedTensorBufferCalculatorOptions>();
  if (cc->InputSidePackets().HasTag(kCalculatorOptions)) {
    options_ = cc->InputSidePackets()
                   .Tag(kCalculatorOptions)
                   .Get<LappedTensorBufferCalculatorOptions>();
  }
  buffer_size_ = options_.buffer_size();
  if (cc->InputSidePackets().HasTag(kBufferSize)) {
    buffer_size_ = cc->InputSidePackets().Tag(kBufferSize).Get<int>();
  }
  overlap_ = options_.overlap();
  if (cc->InputSidePackets().HasTag(kOverlap)) {
    overlap_ = cc->InputSidePackets().Tag(kOverlap).Get<int>();
  }
  timestamp_offset_ = options_.timestamp_offset();
  if (cc->InputSidePackets().HasTag(kTimestampOffset)) {
    timestamp_offset_ = cc->InputSidePackets().Tag(kTimestampOffset).Get<int>();
  }

  RET_CHECK_LT(overlap_, buffer_size_);
  RET_CHECK_GE(timestamp_offset_, 0)
      << "Negative timestamp_offset is not allowed.";
  RET_CHECK_LT(timestamp_offset_, buffer_size_)
      << "output_frame_num_offset has to be less than buffer_size.";
  RET_CHECK_LT(options_.padding(), buffer_size_)
      << "padding option must be smaller than buffer size.";
  timestamp_buffer_ =
      absl::make_unique<CircularBuffer<Timestamp>>(buffer_size_);
  buffer_ = absl::make_unique<CircularBuffer<tf::Tensor>>(buffer_size_);
  steps_until_output_ = buffer_size_ - options_.padding();
  initialized_ = false;
  return absl::OkStatus();
}

absl::Status LappedTensorBufferCalculator::Process(CalculatorContext* cc) {
  // These are cheap, shallow copies.
  tensorflow::Tensor input_tensor(
      cc->Inputs().Index(0).Get<tensorflow::Tensor>());
  if (options_.add_batch_dim_to_tensors()) {
    RET_CHECK_OK(AddBatchDimension(&input_tensor));
  }
  // Pad frames at the beginning with the first frame.
  if (!initialized_) {
    for (int i = 0; i < options_.padding(); ++i) {
      buffer_->push_back(input_tensor);
      timestamp_buffer_->push_back(cc->InputTimestamp());
    }
    initialized_ = true;
  }
  buffer_->push_back(input_tensor);
  timestamp_buffer_->push_back(cc->InputTimestamp());
  --steps_until_output_;
  if (steps_until_output_ <= 0) {
    MP_RETURN_IF_ERROR(ProcessBuffer(cc));
  }

  return absl::OkStatus();
}

absl::Status LappedTensorBufferCalculator::Close(CalculatorContext* cc) {
  if (!initialized_ || options_.padding() == 0) {
    return absl::OkStatus();
  }
  int last_frame = buffer_size_ - steps_until_output_ - 1;
  const auto& pad_frame = buffer_->Get(last_frame);
  for (int i = 0; i < steps_until_output_ + options_.padding(); ++i) {
    buffer_->push_back(pad_frame);
    timestamp_buffer_->push_back(cc->InputTimestamp());
  }
  MP_RETURN_IF_ERROR(ProcessBuffer(cc));

  return absl::OkStatus();
}

// Adds a batch dimension to the input tensor if specified in the calculator
// options.
absl::Status LappedTensorBufferCalculator::AddBatchDimension(
    tf::Tensor* input_tensor) {
  if (options_.add_batch_dim_to_tensors()) {
    tf::TensorShape new_shape(input_tensor->shape());
    new_shape.InsertDim(0, 1);
    RET_CHECK(input_tensor->CopyFrom(*input_tensor, new_shape))
        << "Could not add 0th dimension to tensor without changing its shape."
        << " Current shape: " << input_tensor->shape().DebugString();
  }
  return absl::OkStatus();
}

// Process buffer
absl::Status LappedTensorBufferCalculator::ProcessBuffer(
    CalculatorContext* cc) {
  auto concatenated = ::absl::make_unique<tf::Tensor>();
  const tf::Status concat_status = tf::tensor::Concat(
      std::vector<tf::Tensor>(buffer_->begin(), buffer_->end()),
      concatenated.get());
  RET_CHECK(concat_status.ok()) << concat_status.ToString();
  // Output cancatenated tensor.
  cc->Outputs().Index(0).Add(concatenated.release(),
                             timestamp_buffer_->Get(timestamp_offset_));
  if (cc->Outputs().NumEntries() > 1) {
    auto output_timestamp = ::absl::make_unique<std::vector<Timestamp>>();
    // Output timestamp vector.
    *output_timestamp = std::vector<Timestamp>(timestamp_buffer_->begin(),
                                               timestamp_buffer_->end());
    RET_CHECK_EQ(output_timestamp->size(), buffer_size_)
        << "Output timestamp size is not correct.";
    cc->Outputs().Index(1).Add(output_timestamp.release(),
                               timestamp_buffer_->Get(timestamp_offset_));
  }
  steps_until_output_ = buffer_size_ - overlap_;
  return absl::OkStatus();
}

}  // namespace mediapipe