chromium/third_party/mediapipe/src/mediapipe/calculators/core/packet_sequencer_calculator.cc

// Copyright 2022 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>
#include <memory>

#include "mediapipe/framework/api2/contract.h"
#include "mediapipe/framework/api2/node.h"
#include "mediapipe/framework/api2/packet.h"
#include "mediapipe/framework/api2/port.h"
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/port/status.h"

namespace mediapipe {
namespace api2 {

// This calculator assigns a timestamp to each "INPUT" packet reflecting
// the most recent "TICK" timestamp.
//
// Each "TICK" timestamp is propagated as a settled "OUTPUT" timestamp.
// This allows "TICK" packets to be processed right away.
// When an "INPUT" packet arrives, it is sent to the "OUTPUT" stream with
// the next unsettled "OUTPUT" timestamp, which is normally one greater than
// the most recent "TICK" timestamp.
//
// If a "TICK" packet and an "INPUT" packet arrive together, the "OUTPUT"
// packet timestamp is derived from the previous "TICK" timestamp,
// and the new "OUTPUT" bound is derived from the current "TICK" timestamp.
// This allows the current "INPUT" packet to cover the current "TICK" timestamp.
//
// Example config:
// node {
//   calculator: "PacketSequencerCalculator"
//   input_stream: "INPUT:switch_selection"
//   input_stream: "TICK:input_image"
//   input_stream: "TICK:input_audio"
//   output_stream: "OUTPUT:switch_selection_timed"
// }
//
class PacketSequencerCalculator : public Node {
 public:
  static constexpr Input<AnyType>::Multiple kInput{"INPUT"};
  static constexpr Input<AnyType>::Multiple kTick{"TICK"};
  static constexpr Output<AnyType>::Multiple kOutput{"OUTPUT"};

  MEDIAPIPE_NODE_CONTRACT(kInput, kTick, kOutput,
                          StreamHandler("ImmediateInputStreamHandler"),
                          TimestampChange::Arbitrary());

  static absl::Status UpdateContract(CalculatorContract* cc) {
    RET_CHECK_EQ(kInput(cc).Count(), kOutput(cc).Count());
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Pass through any input packets at the output stream bound.
    for (int i = 0; i < kInput(cc).Count(); ++i) {
      Timestamp stream_bound = kOutput(cc)[i].NextTimestampBound();
      const PacketBase input_packet = kInput(cc)[i].packet();
      if (!input_packet.IsEmpty()) {
        Timestamp output_ts = std::max(Timestamp::Min(), stream_bound);
        kOutput(cc)[i].Send(input_packet.At(output_ts));
      }
    }

    // Find the new tick timestamp, if any.
    Timestamp tick_ts = Timestamp::Min();
    for (int i = 0; i < kTick(cc).Count(); ++i) {
      const PacketBase& tick_packet = kTick(cc)[i].packet();
      // For either an input packet or an empty input stream,
      // the packet timestamp indicates the latest "settled timestamp",
      // and when it arrives it equals the InputTimestamp().
      if (tick_packet.timestamp() == cc->InputTimestamp()) {
        tick_ts = std::max(tick_ts, tick_packet.timestamp());
        break;
      }
    }

    // Advance all output stream bounds past the tick timestamp.
    for (int i = 0; i < kInput(cc).Count(); ++i) {
      Timestamp stream_bound = kOutput(cc)[i].NextTimestampBound();
      if (tick_ts >= stream_bound) {
        kOutput(cc)[i].SetNextTimestampBound(tick_ts.NextAllowedInStream());
      }
    }
    return absl::OkStatus();
  }
};

MEDIAPIPE_REGISTER_NODE(PacketSequencerCalculator);
}  // namespace api2
}  // namespace mediapipe