chromium/third_party/mediapipe/src/mediapipe/calculators/core/merge_calculator.cc

// Copyright 2019 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "absl/log/absl_log.h"
#include "mediapipe/framework/api2/node.h"
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/port/ret_check.h"
#include "mediapipe/framework/port/status.h"

namespace mediapipe {
namespace api2 {

// This calculator takes a set of input streams and combines them into a single
// output stream. The packets from different streams do not need to contain the
// same type. If there are packets arriving at the same time from two or more
// input streams, the packet corresponding to the input stream with the smallest
// index is passed to the output and the rest are ignored.
//
// Example use-case:
// Suppose we have two (or more) different algorithms for detecting shot
// boundaries and we need to merge their packets into a single stream. The
// algorithms may emit shot boundaries at the same time and their output types
// may not be compatible. Subsequent calculators that process the merged stream
// may be interested only in the timestamps of the shot boundary packets and so
// it may not even need to inspect the values stored inside the packets.
//
// Example config:
// node {
//   calculator: "MergeCalculator"
//   input_stream: "shot_info1"
//   input_stream: "shot_info2"
//   input_stream: "shot_info3"
//   output_stream: "merged_shot_infos"
// }
//
class MergeCalculator : public Node {
 public:
  static constexpr Input<AnyType>::Multiple kIn{""};
  static constexpr Output<AnyType> kOut{""};

  MEDIAPIPE_NODE_CONTRACT(kIn, kOut);

  static absl::Status UpdateContract(CalculatorContract* cc) {
    RET_CHECK_GT(kIn(cc).Count(), 0) << "Needs at least one input stream";
    if (kIn(cc).Count() == 1) {
      ABSL_LOG(WARNING)
          << "MergeCalculator expects multiple input streams to merge but is "
             "receiving only one. Make sure the calculator is configured "
             "correctly or consider removing this calculator to reduce "
             "unnecessary overhead.";
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Output the packet from the first input stream with a packet ready at this
    // timestamp.
    for (const auto& input : kIn(cc)) {
      if (!input.IsEmpty()) {
        kOut(cc).Send(input.packet());
        return absl::OkStatus();
      }
    }

    ABSL_LOG(WARNING) << "Empty input packets at timestamp "
                      << cc->InputTimestamp().Value();

    return absl::OkStatus();
  }
};

MEDIAPIPE_REGISTER_NODE(MergeCalculator);

}  // namespace api2
}  // namespace mediapipe