chromium/third_party/mediapipe/src/mediapipe/calculators/core/previous_loopback_calculator.cc

// Copyright 2019 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <deque>

#include "mediapipe/framework/api2/node.h"
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/port/ret_check.h"
#include "mediapipe/framework/port/status.h"
#include "mediapipe/framework/timestamp.h"

namespace mediapipe {
namespace api2 {

// PreviousLoopbackCalculator is useful when a graph needs to process an input
// together with some previous output.
//
// For the first packet that arrives on the MAIN input, the timestamp bound is
// advanced on the PREV_LOOP. Downstream calculators will see this as an empty
// packet. This way they are not kept waiting for the previous output, which
// for the first iteration does not exist.
//
// Thereafter,
// - Each non-empty MAIN packet results in:
//   a) a PREV_LOOP packet with contents of the LOOP packet received at the
//      timestamp of the previous non-empty MAIN packet
//   b) or in a PREV_LOOP timestamp bound update if the LOOP packet was empty.
// - Each empty MAIN packet indicating timestamp bound update results in a
//   PREV_LOOP timestamp bound update.
//
// Example config:
// node {
//   calculator: "PreviousLoopbackCalculator"
//   input_stream: "MAIN:input"
//   input_stream: "LOOP:output"
//   input_stream_info: { tag_index: 'LOOP' back_edge: true }
//   output_stream: "PREV_LOOP:prev_output"
// }
// node {
//   calculator: "FaceTracker"
//   input_stream: "VIDEO:input"
//   input_stream: "PREV_TRACK:prev_output"
//   output_stream: "TRACK:output"
// }
class PreviousLoopbackCalculator : public Node {
 public:
  static constexpr Input<AnyType> kMain{"MAIN"};
  static constexpr Input<AnyType> kLoop{"LOOP"};
  static constexpr Output<SameType<kLoop>> kPrevLoop{"PREV_LOOP"};
  // TODO: an optional PREV_TIMESTAMP output could be added to
  // carry the original timestamp of the packet on PREV_LOOP.

  MEDIAPIPE_NODE_CONTRACT(kMain, kLoop, kPrevLoop,
                          StreamHandler("ImmediateInputStreamHandler"),
                          TimestampChange::Arbitrary());

  static absl::Status UpdateContract(CalculatorContract* cc) {
    // Process() function is invoked in response to MAIN/LOOP stream timestamp
    // bound updates.
    cc->SetProcessTimestampBounds(true);
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    kPrevLoop(cc).SetHeader(kLoop(cc).Header());
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Non-empty packets and empty packets indicating timestamp bound updates
    // are guaranteed to have timestamps greater than timestamps of previous
    // packets within the same stream. Calculator tracks and operates on such
    // packets.

    const PacketBase& main_packet = kMain(cc).packet();
    if (prev_main_ts_ < main_packet.timestamp()) {
      Timestamp loop_timestamp;
      if (!main_packet.IsEmpty()) {
        loop_timestamp = prev_non_empty_main_ts_;
        prev_non_empty_main_ts_ = main_packet.timestamp();
      } else {
        // Calculator advances PREV_LOOP timestamp bound in response to empty
        // MAIN packet, hence not caring about corresponding loop packet.
        loop_timestamp = Timestamp::Unset();
      }
      main_packet_specs_.push_back({main_packet.timestamp(), loop_timestamp});
      prev_main_ts_ = main_packet.timestamp();
    }

    const PacketBase& loop_packet = kLoop(cc).packet();
    if (prev_loop_ts_ < loop_packet.timestamp()) {
      loop_packets_.push_back(loop_packet);
      prev_loop_ts_ = loop_packet.timestamp();
    }

    while (!main_packet_specs_.empty() && !loop_packets_.empty()) {
      // The earliest MAIN packet.
      MainPacketSpec main_spec = main_packet_specs_.front();
      // The earliest LOOP packet.
      const PacketBase& loop_candidate = loop_packets_.front();
      // Match LOOP and MAIN packets.
      if (main_spec.loop_timestamp < loop_candidate.timestamp()) {
        // No LOOP packet can match the MAIN packet under review.
        kPrevLoop(cc).SetNextTimestampBound(main_spec.timestamp + 1);
        main_packet_specs_.pop_front();
      } else if (main_spec.loop_timestamp > loop_candidate.timestamp()) {
        // No MAIN packet can match the LOOP packet under review.
        loop_packets_.pop_front();
      } else {
        // Exact match found.
        if (loop_candidate.IsEmpty()) {
          // However, LOOP packet is empty.
          kPrevLoop(cc).SetNextTimestampBound(main_spec.timestamp + 1);
        } else {
          // Avoids sending leftovers to a stream that's already closed.
          if (!kPrevLoop(cc).IsClosed()) {
            kPrevLoop(cc).Send(loop_candidate.At(main_spec.timestamp));
          }
        }
        loop_packets_.pop_front();
        main_packet_specs_.pop_front();
      }

      // We can close PREV_LOOP output stream as soon as we processed last
      // possible MAIN packet. That can happen in two cases:
      // a) Non-empty MAIN packet has been received with Timestamp::Max()
      // b) Empty MAIN packet has been received with Timestamp::Max() indicating
      //    MAIN is done.
      if (main_spec.timestamp == Timestamp::Done().PreviousAllowedInStream()) {
        kPrevLoop(cc).Close();
      }
    }

    return absl::OkStatus();
  }

 private:
  struct MainPacketSpec {
    Timestamp timestamp;
    // Expected timestamp of the packet from LOOP stream that corresponds to the
    // packet from MAIN stream descirbed by this spec.
    Timestamp loop_timestamp;
  };

  // Contains specs for MAIN packets which only can be:
  // - non-empty packets
  // - empty packets indicating timestamp bound updates
  //
  // Sorted according to packet timestamps.
  std::deque<MainPacketSpec> main_packet_specs_;
  Timestamp prev_main_ts_ = Timestamp::Unstarted();
  Timestamp prev_non_empty_main_ts_ = Timestamp::Unstarted();

  // Contains LOOP packets which only can be:
  // - the very first empty packet
  // - non empty packets
  // - empty packets indicating timestamp bound updates
  //
  // Sorted according to packet timestamps.
  std::deque<PacketBase> loop_packets_;
  // Using "Timestamp::Unset" instead of "Timestamp::Unstarted" in order to
  // allow addition of the very first empty packet (which doesn't indicate
  // timestamp bound change necessarily).
  Timestamp prev_loop_ts_ = Timestamp::Unset();
};
MEDIAPIPE_REGISTER_NODE(PreviousLoopbackCalculator);

}  // namespace api2
}  // namespace mediapipe