chromium/third_party/mediapipe/src/mediapipe/calculators/core/round_robin_demux_calculator.cc

// Copyright 2019 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "mediapipe/framework/api2/node.h"
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/port/ret_check.h"

namespace mediapipe {
namespace api2 {

// Forwards the input packet to one of the n output streams "OUTPUT:0",
// "OUTPUT:1", ..., in round robin fashion.  The index of the selected output
// stream is emitted to the output stream "SELECT".  If not needed, the
// output stream "SELECT" may be omitted.
//
// Designed to run graph bottlenecks in parallel and thus reduce graph
// processing latency by parallelizing.
//
// A simple example config is:
//
// node {
//   calculator: "RoundRobinDemuxCalculator"
//   input_stream: "signal"
//   output_stream: "OUTPUT:0:signal0"
//   output_stream: "OUTPUT:1:signal1"
//   output_stream: "SELECT:select"
// }
//
// node {
//   calculator: "SlowCalculator"
//   input_stream: "signal0"
//   output_stream: "output0"
// }
//
// node {
//   calculator: "SlowCalculator"
//   input_stream: "signal1"
//   output_stream: "output1"
// }
//
// node {
//   calculator: "MuxCalculator"
//   input_stream: "INPUT:0:output0"
//   input_stream: "INPUT:1:output1"
//   input_stream: "SELECT:select"
//   output_stream: "OUTPUT:output"
//   input_stream_handler {
//     input_stream_handler: "MuxInputStreamHandler"
//   }
// }
//
// which is essentially running the following configuration in parallel with a
// concurrency level of two:
//
// node {
//   calculator: "SlowCalculator"
//   input_stream: "signal"
//   output_stream: "output"
// }
//
// If SlowCalculator has more than one output stream, the user can group the
// output with MakePairCalculator, MakeVectorCalculator, or a similar variant to
// use it with MuxCalculator and later unpack, or can create new variants of
// MuxCalculator/MuxInputStreamHandler.
class RoundRobinDemuxCalculator : public Node {
 public:
  static constexpr Input<AnyType> kIn{""};
  static constexpr Output<int>::Optional kSelect{"SELECT"};
  static constexpr Output<SameType<kIn>>::Multiple kOut{"OUTPUT"};

  MEDIAPIPE_NODE_CONTRACT(kIn, kSelect, kOut);

  absl::Status Open(CalculatorContext* cc) override {
    output_data_stream_index_ = 0;
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) override {
    kOut(cc)[output_data_stream_index_].Send(kIn(cc).packet());
    if (kSelect(cc).IsConnected()) {
      kSelect(cc).Send(output_data_stream_index_);
    }
    output_data_stream_index_ =
        (output_data_stream_index_ + 1) % kOut(cc).Count();
    return absl::OkStatus();
  }

 private:
  int output_data_stream_index_;
};

MEDIAPIPE_REGISTER_NODE(RoundRobinDemuxCalculator);

}  // namespace api2
}  // namespace mediapipe