chromium/third_party/mediapipe/src/mediapipe/calculators/core/immediate_mux_calculator.cc

// Copyright 2019 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "absl/log/absl_log.h"
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/port/ret_check.h"
#include "mediapipe/framework/port/status.h"

namespace mediapipe {

// This Calculator multiplexes several input streams into a single
// output stream, dropping input packets with timestamps older than the
// last output packet.  In case two packets arrive with the same timestamp,
// the packet with the lower stream index will be output and the rest will
// be dropped.
//
// This Calculator optionally produces a finish inidicator as its second
// output stream.  One indicator packet is produced for each input packet
// received.
//
// This Calculator can be used with an ImmediateInputStreamHandler or with the
// default ISH.
//
// This Calculator is designed to work with a Demux calculator such as
// the RoundRobinDemuxCalculator.  Therefore, packets from different
// input streams are normally not expected to have the same timestamp.
//
// NOTE: this calculator can drop packets non-deterministically, depending on
// how fast the input streams are fed. In most cases, MuxCalculator should be
// preferred. In particular, dropping packets can interfere with rate limiting
// mechanisms.
class ImmediateMuxCalculator : public CalculatorBase {
 public:
  // This calculator combines any set of input streams into a single
  // output stream.  All input stream types must match the output stream type.
  static absl::Status GetContract(CalculatorContract* cc);

  // Passes any input packet to the output stream immediately, unless the
  // packet timestamp is lower than a previously passed packet.
  absl::Status Process(CalculatorContext* cc) override;
  absl::Status Open(CalculatorContext* cc) override;
};
REGISTER_CALCULATOR(ImmediateMuxCalculator);

absl::Status ImmediateMuxCalculator::GetContract(CalculatorContract* cc) {
  RET_CHECK(cc->Outputs().NumEntries() >= 1 && cc->Outputs().NumEntries() <= 2)
      << "This calculator produces only one or two output streams.";
  cc->Outputs().Index(0).SetAny();
  if (cc->Outputs().NumEntries() >= 2) {
    cc->Outputs().Index(1).Set<bool>();
  }
  for (int i = 0; i < cc->Inputs().NumEntries(); ++i) {
    cc->Inputs().Index(i).SetSameAs(&cc->Outputs().Index(0));
  }
  return absl::OkStatus();
}

absl::Status ImmediateMuxCalculator::Open(CalculatorContext* cc) {
  cc->SetOffset(TimestampDiff(0));
  return absl::OkStatus();
}

absl::Status ImmediateMuxCalculator::Process(CalculatorContext* cc) {
  // Pass along the first packet, unless it has been superseded.
  for (int i = 0; i < cc->Inputs().NumEntries(); ++i) {
    const Packet& packet = cc->Inputs().Index(i).Value();
    if (!packet.IsEmpty()) {
      if (packet.Timestamp() >= cc->Outputs().Index(0).NextTimestampBound()) {
        cc->Outputs().Index(0).AddPacket(packet);
      } else {
        ABSL_LOG_FIRST_N(WARNING, 5)
            << "Dropping a packet with timestamp " << packet.Timestamp();
      }
      if (cc->Outputs().NumEntries() >= 2) {
        Timestamp output_timestamp = std::max(
            cc->InputTimestamp(), cc->Outputs().Index(1).NextTimestampBound());
        cc->Outputs().Index(1).Add(new bool(true), output_timestamp);
      }
    }
  }
  return absl::OkStatus();
}

}  // namespace mediapipe