chromium/third_party/mediapipe/src/mediapipe/calculators/core/flow_limiter_calculator.cc

// Copyright 2019 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>
#include <utility>
#include <vector>

#include "mediapipe/calculators/core/flow_limiter_calculator.pb.h"
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/port/status.h"
#include "mediapipe/util/header_util.h"

namespace mediapipe {

constexpr char kFinishedTag[] = "FINISHED";
constexpr char kAllowTag[] = "ALLOW";
constexpr char kMaxInFlightTag[] = "MAX_IN_FLIGHT";
constexpr char kOptionsTag[] = "OPTIONS";

// FlowLimiterCalculator is used to limit the number of frames in flight
// by dropping input frames when necessary.
//
// The input stream "FINISHED" is used to signal the FlowLimiterCalculator
// when a frame is finished processing.  Either a non-empty "FINISHED" packet
// or a timestamp bound should be received for each processed frame.
//
// The combination of `max_in_flight: 1` and `max_in_queue: 1` generally gives
// best throughput/latency balance.  Throughput is nearly optimal as the
// graph is never idle as there is always something in the queue.  Latency is
// nearly optimal latency as the queue always stores the latest available frame.
//
// Increasing `max_in_flight` to 2 or more can yield the better throughput
// when the graph exhibits a high degree of pipeline parallelism.  Decreasing
// `max_in_queue` to 0 can yield a better average latency, but at the cost of
// lower throughput (lower framerate) due to the time during which the graph
// is idle awaiting the next input frame.
//
// Example config:
// node {
//   calculator: "FlowLimiterCalculator"
//   input_stream: "raw_frames"
//   input_stream: "FINISHED:finished"
//   input_stream_info: {
//     tag_index: 'FINISHED'
//     back_edge: true
//   }
//   output_stream: "sampled_frames"
//   output_stream: "ALLOW:allowed_timestamps"
// }
//
// The "ALLOW" stream indicates the transition between accepting frames and
// dropping frames.  "ALLOW = true" indicates the start of accepting frames
// including the current timestamp, and "ALLOW = false" indicates the start of
// dropping frames including the current timestamp.
//
// FlowLimiterCalculator provides limited support for multiple input streams.
// The first input stream is treated as the main input stream and successive
// input streams are treated as auxiliary input streams.  The auxiliary input
// streams are limited to timestamps allowed by the "ALLOW" stream.
//
class FlowLimiterCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    auto& side_inputs = cc->InputSidePackets();
    side_inputs.Tag(kOptionsTag).Set<FlowLimiterCalculatorOptions>().Optional();
    cc->Inputs()
        .Tag(kOptionsTag)
        .Set<FlowLimiterCalculatorOptions>()
        .Optional();
    RET_CHECK_GE(cc->Inputs().NumEntries(""), 1);
    for (int i = 0; i < cc->Inputs().NumEntries(""); ++i) {
      cc->Inputs().Get("", i).SetAny();
      cc->Outputs().Get("", i).SetSameAs(&(cc->Inputs().Get("", i)));
    }
    cc->Inputs().Get("FINISHED", 0).SetAny();
    cc->InputSidePackets().Tag(kMaxInFlightTag).Set<int>().Optional();
    cc->Outputs().Tag(kAllowTag).Set<bool>().Optional();
    cc->SetInputStreamHandler("ImmediateInputStreamHandler");
    cc->SetProcessTimestampBounds(true);
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    options_ = cc->Options<FlowLimiterCalculatorOptions>();
    options_ = tool::RetrieveOptions(options_, cc->InputSidePackets());
    if (cc->InputSidePackets().HasTag(kMaxInFlightTag)) {
      options_.set_max_in_flight(
          cc->InputSidePackets().Tag(kMaxInFlightTag).Get<int>());
    }
    input_queues_.resize(cc->Inputs().NumEntries(""));
    allowed_[Timestamp::Unset()] = true;
    RET_CHECK_OK(CopyInputHeadersToOutputs(cc->Inputs(), &(cc->Outputs())));
    return absl::OkStatus();
  }

  // Releases input packets allowed by the max_in_flight constraint.
  absl::Status Process(CalculatorContext* cc) final {
    options_ = tool::RetrieveOptions(options_, cc->Inputs());

    // Process the FINISHED input stream.
    Packet finished_packet = cc->Inputs().Tag(kFinishedTag).Value();
    if (finished_packet.Timestamp() == cc->InputTimestamp()) {
      while (!frames_in_flight_.empty() &&
             frames_in_flight_.front() <= finished_packet.Timestamp()) {
        frames_in_flight_.pop_front();
      }
    }

    // Process the frame input streams.
    for (int i = 0; i < cc->Inputs().NumEntries(""); ++i) {
      Packet packet = cc->Inputs().Get("", i).Value();
      if (!packet.IsEmpty()) {
        input_queues_[i].push_back(packet);
      }
    }

    // Abandon expired frames in flight.  Note that old frames are abandoned
    // when much newer frame timestamps arrive regardless of elapsed time.
    TimestampDiff timeout = options_.in_flight_timeout();
    Timestamp latest_ts = cc->Inputs().Get("", 0).Value().Timestamp();
    if (timeout > 0 && latest_ts == cc->InputTimestamp() &&
        latest_ts < Timestamp::Max()) {
      while (!frames_in_flight_.empty() &&
             (latest_ts - frames_in_flight_.front()) > timeout) {
        frames_in_flight_.pop_front();
      }
    }

    // Release allowed frames from the main input queue.
    auto& input_queue = input_queues_[0];
    while (ProcessingAllowed() && !input_queue.empty()) {
      Packet packet = input_queue.front();
      input_queue.pop_front();
      cc->Outputs().Get("", 0).AddPacket(packet);
      SendAllow(true, packet.Timestamp(), cc);
      frames_in_flight_.push_back(packet.Timestamp());
    }

    // Limit the number of queued frames.
    // Note that frames can be dropped after frames are released because
    // frame-packets and FINISH-packets never arrive in the same Process call.
    while (input_queue.size() > options_.max_in_queue()) {
      Packet packet = input_queue.front();
      input_queue.pop_front();
      SendAllow(false, packet.Timestamp(), cc);
    }

    // Propagate the input timestamp bound.
    if (!input_queue.empty()) {
      Timestamp bound = input_queue.front().Timestamp();
      SetNextTimestampBound(bound, &cc->Outputs().Get("", 0));
    } else {
      Timestamp bound =
          cc->Inputs().Get("", 0).Value().Timestamp().NextAllowedInStream();
      SetNextTimestampBound(bound, &cc->Outputs().Get("", 0));
      if (cc->Outputs().HasTag(kAllowTag)) {
        SetNextTimestampBound(bound, &cc->Outputs().Tag(kAllowTag));
      }
    }

    ProcessAuxiliaryInputs(cc);

    // Discard old ALLOW ranges.
    Timestamp input_bound = InputTimestampBound(cc);
    auto first_range = std::prev(allowed_.upper_bound(input_bound));
    allowed_.erase(allowed_.begin(), first_range);
    return absl::OkStatus();
  }

  int LedgerSize() {
    int result = frames_in_flight_.size() + allowed_.size();
    for (const auto& queue : input_queues_) {
      result += queue.size();
    }
    return result;
  }

 private:
  // Returns true if an additional frame can be released for processing.
  // The "ALLOW" output stream indicates this condition at each input frame.
  bool ProcessingAllowed() {
    return frames_in_flight_.size() < options_.max_in_flight();
  }

  // Outputs a packet indicating whether a frame was sent or dropped.
  void SendAllow(bool allow, Timestamp ts, CalculatorContext* cc) {
    if (cc->Outputs().HasTag(kAllowTag)) {
      cc->Outputs().Tag(kAllowTag).AddPacket(MakePacket<bool>(allow).At(ts));
    }
    allowed_[ts] = allow;
  }

  // Returns true if a timestamp falls within a range of allowed timestamps.
  bool IsAllowed(Timestamp timestamp) {
    auto it = allowed_.upper_bound(timestamp);
    return std::prev(it)->second;
  }

  // Sets the timestamp bound or closes an output stream.
  void SetNextTimestampBound(Timestamp bound, OutputStream* stream) {
    if (bound > Timestamp::Max()) {
      stream->Close();
    } else {
      stream->SetNextTimestampBound(bound);
    }
  }

  // Returns the lowest unprocessed input Timestamp.
  Timestamp InputTimestampBound(CalculatorContext* cc) {
    Timestamp result = Timestamp::Done();
    for (int i = 0; i < input_queues_.size(); ++i) {
      auto& queue = input_queues_[i];
      auto& stream = cc->Inputs().Get("", i);
      Timestamp bound = queue.empty()
                            ? stream.Value().Timestamp().NextAllowedInStream()
                            : queue.front().Timestamp();
      result = std::min(result, bound);
    }
    return result;
  }

  // Releases input packets up to the latest settled input timestamp.
  void ProcessAuxiliaryInputs(CalculatorContext* cc) {
    Timestamp settled_bound = cc->Outputs().Get("", 0).NextTimestampBound();
    for (int i = 1; i < cc->Inputs().NumEntries(""); ++i) {
      // Release settled frames from each input queue.
      while (!input_queues_[i].empty() &&
             input_queues_[i].front().Timestamp() < settled_bound) {
        Packet packet = input_queues_[i].front();
        input_queues_[i].pop_front();
        if (IsAllowed(packet.Timestamp())) {
          cc->Outputs().Get("", i).AddPacket(packet);
        }
      }

      // Propagate each input timestamp bound.
      if (!input_queues_[i].empty()) {
        Timestamp bound = input_queues_[i].front().Timestamp();
        SetNextTimestampBound(bound, &cc->Outputs().Get("", i));
      } else {
        Timestamp bound =
            cc->Inputs().Get("", i).Value().Timestamp().NextAllowedInStream();
        SetNextTimestampBound(bound, &cc->Outputs().Get("", i));
      }
    }
  }

 private:
  FlowLimiterCalculatorOptions options_;
  std::vector<std::deque<Packet>> input_queues_;
  std::deque<Timestamp> frames_in_flight_;
  std::map<Timestamp, bool> allowed_;
};
REGISTER_CALCULATOR(FlowLimiterCalculator);

}  // namespace mediapipe