#include "mediapipe/framework/calculator_node.h"
#include <memory>
#include <set>
#include <string>
#include <unordered_map>
#include <utility>
#include "absl/log/absl_check.h"
#include "absl/log/absl_log.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/strings/substitute.h"
#include "absl/synchronization/mutex.h"
#include "mediapipe/framework/calculator.pb.h"
#include "mediapipe/framework/calculator_base.h"
#include "mediapipe/framework/counter_factory.h"
#include "mediapipe/framework/graph_service_manager.h"
#include "mediapipe/framework/input_stream_manager.h"
#include "mediapipe/framework/mediapipe_profiling.h"
#include "mediapipe/framework/output_stream_manager.h"
#include "mediapipe/framework/packet.h"
#include "mediapipe/framework/packet_set.h"
#include "mediapipe/framework/packet_type.h"
#include "mediapipe/framework/port.h"
#include "mediapipe/framework/port/logging.h"
#include "mediapipe/framework/port/proto_ns.h"
#include "mediapipe/framework/port/ret_check.h"
#include "mediapipe/framework/port/source_location.h"
#include "mediapipe/framework/port/status_builder.h"
#include "mediapipe/framework/timestamp.h"
#include "mediapipe/framework/tool/name_util.h"
#include "mediapipe/framework/tool/status_util.h"
#include "mediapipe/framework/tool/tag_map.h"
#include "mediapipe/framework/tool/validate_name.h"
namespace mediapipe {
namespace {
const PacketType* GetPacketType(const PacketTypeSet& packet_type_set,
const std::string& tag, const int index) { … }
std::shared_ptr<tool::TagMap> RemoveNames(const tool::TagMap& tag_map,
std::set<std::string> names) { … }
template <class CollectionType>
void CopyCollection(const CollectionType& other, CollectionType* result) { … }
std::unique_ptr<PacketTypeSet> RemoveOmittedPacketTypes(
const PacketTypeSet& packet_types,
const std::map<std::string, Packet>& all_side_packets,
const ValidatedGraphConfig* validated_graph) { … }
}
CalculatorNode::CalculatorNode() { … }
Timestamp CalculatorNode::SourceProcessOrder(
const CalculatorContext* cc) const { … }
absl::Status CalculatorNode::Initialize(
const ValidatedGraphConfig* validated_graph, NodeTypeInfo::NodeRef node_ref,
InputStreamManager* input_stream_managers,
OutputStreamManager* output_stream_managers,
OutputSidePacketImpl* output_side_packets, int* buffer_size_hint,
std::shared_ptr<ProfilingContext> profiling_context,
std::shared_ptr<GraphServiceManager> graph_service_manager) { … }
absl::Status CalculatorNode::InitializeOutputSidePackets(
const PacketTypeSet& output_side_packet_types,
OutputSidePacketImpl* output_side_packets) { … }
absl::Status CalculatorNode::InitializeInputSidePackets(
OutputSidePacketImpl* output_side_packets) { … }
absl::Status CalculatorNode::InitializeOutputStreams(
OutputStreamManager* output_stream_managers) { … }
absl::Status CalculatorNode::InitializeInputStreams(
InputStreamManager* input_stream_managers,
OutputStreamManager* output_stream_managers) { … }
absl::Status CalculatorNode::InitializeInputStreamHandler(
const InputStreamHandlerConfig& handler_config,
const PacketTypeSet& input_stream_types) { … }
absl::Status CalculatorNode::InitializeOutputStreamHandler(
const OutputStreamHandlerConfig& handler_config,
const PacketTypeSet& output_stream_types) { … }
absl::Status CalculatorNode::ConnectShardsToStreams(
CalculatorContext* calculator_context) { … }
void CalculatorNode::SetExecutor(const std::string& executor) { … }
bool CalculatorNode::Prepared() const { … }
bool CalculatorNode::Opened() const { … }
bool CalculatorNode::Active() const { … }
bool CalculatorNode::Closed() const { … }
void CalculatorNode::SetMaxInputStreamQueueSize(int max_queue_size) { … }
absl::Status CalculatorNode::PrepareForRun(
const std::map<std::string, Packet>& all_side_packets,
const std::map<std::string, Packet>& service_packets,
std::function<void()> ready_for_open_callback,
std::function<void()> source_node_opened_callback,
std::function<void(CalculatorContext*)> schedule_callback,
std::function<void(absl::Status)> error_callback,
CounterFactory* counter_factory) { … }
namespace {
const Packet GetPacket(const OutputSidePacket& out) { … }
absl::Status ResendSidePackets(CalculatorContext* cc) { … }
}
bool CalculatorNode::OutputsAreConstant(CalculatorContext* cc) { … }
absl::Status CalculatorNode::OpenNode() { … }
void CalculatorNode::ActivateNode() { … }
void CalculatorNode::CloseInputStreams() { … }
void CalculatorNode::CloseOutputStreams(OutputStreamShardSet* outputs) { … }
absl::Status CalculatorNode::CloseNode(const absl::Status& graph_status,
bool graph_run_ended) { … }
void CalculatorNode::CleanupAfterRun(const absl::Status& graph_status) { … }
void CalculatorNode::SchedulingLoop() { … }
bool CalculatorNode::ReadyForOpen() const { … }
void CalculatorNode::InputStreamHeadersReady() { … }
void CalculatorNode::InputSidePacketsReady() { … }
void CalculatorNode::CheckIfBecameReady() { … }
void CalculatorNode::NodeOpened() { … }
void CalculatorNode::EndScheduling() { … }
bool CalculatorNode::TryToBeginScheduling() { … }
std::string CalculatorNode::DebugInputStreamNames() const { … }
std::string CalculatorNode::DebugName() const { … }
absl::Status CalculatorNode::ProcessNode(
CalculatorContext* calculator_context) { … }
void CalculatorNode::SetQueueSizeCallbacks(
InputStreamManager::QueueSizeCallback becomes_full_callback,
InputStreamManager::QueueSizeCallback becomes_not_full_callback) { … }
}