chromium/third_party/mediapipe/src/mediapipe/framework/stream_handler/fixed_size_input_stream_handler.h

// Copyright 2023 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef MEDIAPIPE_FRAMEWORK_STREAM_HANDLER_FIXED_SIZE_INPUT_STREAM_HANDLER_H_
#define MEDIAPIPE_FRAMEWORK_STREAM_HANDLER_FIXED_SIZE_INPUT_STREAM_HANDLER_H_

#include <cstdint>
#include <list>
#include <memory>

#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
#include "mediapipe/framework/calculator_context_manager.h"
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/collection_item_id.h"
#include "mediapipe/framework/input_stream_handler.h"
#include "mediapipe/framework/stream_handler/default_input_stream_handler.h"

namespace mediapipe {

// Input stream handler that limits each input queue to a maximum of
// target_queue_size packets, discarding older packets as needed.  When a
// timestamp is dropped from a stream, it is dropped from all others as well.
//
// For example, a calculator node with one input stream and the following input
// stream handler specs:
//
// node {
//   calculator: "CalculatorRunningAtOneFps"
//   input_stream: "packets_streaming_in_at_ten_fps"
//   input_stream_handler {
//     input_stream_handler: "FixedSizeInputStreamHandler"
//   }
// }
//
// will always try to keep the newest packet in the input stream.
//
// A few details: FixedSizeInputStreamHandler takes action when any stream grows
// to trigger_queue_size or larger.  It then keeps at most target_queue_size
// packets in every InputStreamImpl.  Every stream is truncated at the same
// timestamp, so that each included timestamp delivers the same packets as
// DefaultInputStreamHandler includes.
class FixedSizeInputStreamHandler : public DefaultInputStreamHandler {
 public:
  FixedSizeInputStreamHandler() = delete;
  FixedSizeInputStreamHandler(std::shared_ptr<tool::TagMap> tag_map,
                              CalculatorContextManager* cc_manager,
                              const MediaPipeOptions& options,
                              bool calculator_run_in_parallel);

 private:
  // Drops packets if all input streams exceed trigger_queue_size.
  void EraseAllSurplus() ABSL_EXCLUSIVE_LOCKS_REQUIRED(erase_mutex_);

  // Returns the latest timestamp allowed before a bound.
  Timestamp PreviousAllowedInStream(Timestamp bound);

  // Returns the lowest timestamp at which a packet may arrive at any stream.
  Timestamp MinStreamBound();

  // Returns the lowest timestamp of a packet ready to process.
  Timestamp MinTimestampToProcess();

  // Keeps only the most recent target_queue_size packets in each stream
  // exceeding trigger_queue_size.  Also, discards all packets older than the
  // first kept timestamp on any stream.
  void EraseAnySurplus(bool keep_one)
      ABSL_EXCLUSIVE_LOCKS_REQUIRED(erase_mutex_);

  void EraseSurplusPackets(bool keep_one)
      ABSL_EXCLUSIVE_LOCKS_REQUIRED(erase_mutex_);

  NodeReadiness GetNodeReadiness(Timestamp* min_stream_timestamp) override;

  void AddPackets(CollectionItemId id,
                  const std::list<Packet>& packets) override;

  void MovePackets(CollectionItemId id, std::list<Packet>* packets) override;

  void FillInputSet(Timestamp input_timestamp,
                    InputStreamShardSet* input_set) override;

 private:
  int32_t trigger_queue_size_;
  int32_t target_queue_size_;
  bool fixed_min_size_;
  // Indicates that GetNodeReadiness has returned kReadyForProcess once, and
  // the corresponding call to FillInputSet has not yet completed.
  bool pending_ ABSL_GUARDED_BY(erase_mutex_);
  // The timestamp used to truncate all input streams.
  Timestamp kept_timestamp_ ABSL_GUARDED_BY(erase_mutex_);
  absl::Mutex erase_mutex_;
};

}  // namespace mediapipe

#endif  // MEDIAPIPE_FRAMEWORK_STREAM_HANDLER_FIXED_SIZE_INPUT_STREAM_HANDLER_H_