chromium/third_party/mediapipe/src/mediapipe/framework/profiler/graph_profiler.h

// Copyright 2019 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef MEDIAPIPE_FRAMEWORK_PROFILER_GRAPH_PROFILER_H_
#define MEDIAPIPE_FRAMEWORK_PROFILER_GRAPH_PROFILER_H_

#include <atomic>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <set>
#include <string>
#include <vector>

#include "absl/time/time.h"
#include "mediapipe/framework/calculator.pb.h"
#include "mediapipe/framework/calculator_context.h"
#include "mediapipe/framework/calculator_profile.pb.h"
#include "mediapipe/framework/deps/clock.h"
#include "mediapipe/framework/deps/monotonic_clock.h"
#include "mediapipe/framework/executor.h"
#include "mediapipe/framework/profiler/graph_tracer.h"
#include "mediapipe/framework/profiler/sharded_map.h"
#include "mediapipe/framework/validated_graph_config.h"

namespace mediapipe {

class GlProfilingHelper;

struct PacketId {
  // Stream name, excluding TAG if available.
  std::string stream_name;
  // Timestamp of the packet.
  int64_t timestamp_usec;

  bool operator==(const PacketId& other) const {
    return (stream_name == other.stream_name) &&
           (timestamp_usec == other.timestamp_usec);
  }
};

struct PacketInfo {
  // Number of remained consumer of this packet.
  // This is used to decide if this PacketInfo should be discarded.
  int64_t remaining_consumer_count;
  // Packet production time based on profiler's clock.
  int64_t production_time_usec;
  // The time when the Process(), that generated the corresponding source
  // packet, was started.
  int64_t source_process_start_usec;

  // For testing.
  bool operator==(const PacketInfo& other) const {
    return (remaining_consumer_count == other.remaining_consumer_count) &&
           (production_time_usec == other.production_time_usec) &&
           (source_process_start_usec == other.source_process_start_usec);
  }
};

// For testing
class GraphProfilerTestPeer;

// GraphProfiler::CaptureProfile option, see the method for details.
enum class PopulateGraphConfig { kNo, kFull };

// GraphProfiler keeps track of the following in microseconds based on the
// profiler clock, for each calculator
// - Open(), Process(), and Close() runtime.
// - Input stream latency: Time from when a packet was produced to when it was
// consumed by the calculator.
// - Process input latency: How long it took a packet to travel from start of
// the graph (source nodes) to reach the Calculator.
// - Process input latency: Process input latency + process runtime for a
// packet.
//
// The profiler can be configured in the graph definition:
//   profiler_config {
//     histogram_interval_size_usec : 2000000
//     num_histogram_intervals : 5
//     enable_profiler: true
//   }
//
// Because the graph definition affects the stream profiling and the profiler is
// singleton, the profiler can not be used with more than one graph. Thus the
// profiler disables itself and returns an empty stub if Initialize() is called
// more than once.
//
// The profiler uses the synchronized monotonic clock by default.
// The client can overwrite this by calling SetClock().
class GraphProfiler : public std::enable_shared_from_this<ProfilingContext> {
 public:
  GraphProfiler();
  ~GraphProfiler();

  // Not copyable or movable.
  GraphProfiler(const GraphProfiler&) = delete;
  GraphProfiler& operator=(const GraphProfiler&) = delete;

  // Initializes the profiler based on the input config.
  // This should be called before adding any calculator to the profiler.
  //
  // Because the graph definition affects the stream profiling and the profiler
  // is singleton, the profiler can not be used with more than one graph. Thus
  // the profiler disables itself and returns an empty stub if Initialize() is
  // called more than once.
  void Initialize(const ValidatedGraphConfig& validated_graph_config)
      ABSL_LOCKS_EXCLUDED(profiler_mutex_);

  // Sets the profiler clock.
  void SetClock(const std::shared_ptr<mediapipe::Clock>& clock)
      ABSL_LOCKS_EXCLUDED(profiler_mutex_);

  // Gets the profiler clock.
  const std::shared_ptr<mediapipe::Clock> GetClock() const
      ABSL_LOCKS_EXCLUDED(profiler_mutex_);

  // Pauses profiling. No-op if already paused.
  void Pause();
  // Resumes profiling. No-op if already profiling.
  void Resume();
  // Resets cumulative profiling data. This only resets the information about
  // Process() and does NOT affect information for Open() and Close() methods.
  void Reset() ABSL_LOCKS_EXCLUDED(profiler_mutex_);
  // Begins profiling for a single graph run.
  absl::Status Start(mediapipe::Executor* executor);
  // Ends profiling for a single graph run.
  absl::Status Stop();

  // Record a tracing event.
  void LogEvent(const TraceEvent& event);

  // Collects the runtime profile for Open(), Process(), and Close() of each
  // calculator in the graph. May be called at any time after the graph has been
  // initialized.
  absl::Status GetCalculatorProfiles(std::vector<CalculatorProfile>*) const
      ABSL_LOCKS_EXCLUDED(profiler_mutex_);

  // Records recent profiling and tracing data.  Includes events since the
  // previous call to CaptureProfile.
  //
  // If `populate_config` is `kFull`, `config` field of the resulting profile
  // will contain canonicalized config of the profiled graph, and
  // `graph_trace.calculator_name` will contain node names referring to that
  // config. Both fields are left empty if the option is set to `kNo`.
  absl::Status CaptureProfile(
      GraphProfile* result,
      PopulateGraphConfig populate_config = PopulateGraphConfig::kNo);

  // Writes recent profiling and tracing data to a file specified in the
  // ProfilerConfig.  Includes events since the previous call to WriteProfile.
  absl::Status WriteProfile();

  // Returns the trace event buffer.
  GraphTracer* tracer() { return packet_tracer_.get(); }

  // Creates and returns a GlProfilingHelper interface for a single GLContext.
  std::unique_ptr<GlProfilingHelper> CreateGlProfilingHelper();

  // Convenience temporary object to record scoped entry and exit.
  // Gets start_time_usec_ on construction and records process runtime on
  // destruction. The |calculator_context| and |profiler| must not be null.
  class Scope {
   public:
    // Constructs a scope.
    //
    // REQUIRES: `calculator_context` and `profiler` are not null, and must both
    // outlive this instance.
    inline explicit Scope(GraphTrace::EventType event_type,
                          CalculatorContext* calculator_context,
                          GraphProfiler* profiler)
        : calculator_method_(event_type),
          calculator_context_(*calculator_context),
          profiler_(profiler) {
      start_time_usec_ = profiler_->TimeNowUsec();
      if (profiler_->is_tracing_) {
        absl::Time time_now = absl::FromUnixMicros(start_time_usec_);
        profiler_->packet_tracer_->LogInputEvents(
            calculator_method_, &calculator_context_, time_now);
      }
    }

    inline ~Scope() {
      int64_t end_time_usec;
      if (profiler_->is_profiling_ || profiler_->is_tracing_) {
        end_time_usec = profiler_->TimeNowUsec();
      }
      if (profiler_->is_profiling_) {
        int64_t end_time_usec = profiler_->TimeNowUsec();
        switch (calculator_method_) {
          case GraphTrace::OPEN:
            profiler_->SetOpenRuntime(calculator_context_, start_time_usec_,
                                      end_time_usec);
            break;

          case GraphTrace::PROCESS:
            profiler_->AddProcessSample(calculator_context_, start_time_usec_,
                                        end_time_usec);
            break;

          case GraphTrace::CLOSE:
            profiler_->SetCloseRuntime(calculator_context_, start_time_usec_,
                                       end_time_usec);
            break;
          default:
            break;
        }
      }
      if (profiler_->is_tracing_) {
        absl::Time time_now = absl::FromUnixMicros(end_time_usec);
        profiler_->packet_tracer_->LogOutputEvents(
            calculator_method_, &calculator_context_, time_now);
      }
    }

   private:
    const GraphTrace::EventType calculator_method_;
    const CalculatorContext& calculator_context_;
    GraphProfiler* profiler_;
    int64_t start_time_usec_;
  };

  const ProfilerConfig& profiler_config() { return profiler_config_; }

  // Helper method to expose the config to other profilers.
  const ValidatedGraphConfig* GetValidatedGraphConfig() {
    return validated_graph_;
  }

  // Gets a numerical identifier for this GraphProfiler object.
  uint64_t GetGraphId() { return graph_id_; }

 private:
  // This can be used to add packet info for the input streams to the graph.
  // It treats the stream defined by |stream_name| as a stream produced by a
  // source calculator and thus uses |timestamp_usec| for the packet production
  // time and source production time.
  // It is the responsibility of the caller to make sure the |timestamp_usec|
  // is valid for profiling.
  void AddPacketInfo(const TraceEvent& packet_info)
      ABSL_LOCKS_EXCLUDED(profiler_mutex_);
  static void InitializeTimeHistogram(int64_t interval_size_usec,
                                      int64_t num_intervals,
                                      TimeHistogram* histogram);
  static void ResetTimeHistogram(TimeHistogram* histogram);
  // Add a sample to a time histogram.
  static void AddTimeSample(int64_t start_time_usec, int64_t end_time_usec,
                            TimeHistogram* histogram);

  // Add output streams to the stream consumer count map.
  // This is neeeded in case an output stream is not consumed by any calculator.
  void InitializeOutputStreams(const CalculatorGraphConfig::Node& node_config);
  // Initializes input stream profiles for a calculator by adding all the input
  // streams.
  // Although this adds back edges to the profile to keep the ordering, it does
  // not add them to |stream_consumer_counts_| to avoid using them for updating
  // |source_process_start_usec| and garbage collection while profiling.
  void InitializeInputStreams(const CalculatorGraphConfig::Node& node_config,
                              int64_t interval_size_usec, int64_t num_intervals,
                              CalculatorProfile* calculator_profile);
  // Returns the input stream back edges for a calculator.
  std::set<int> GetBackEdgeIds(const CalculatorGraphConfig::Node& node_config,
                               const tool::TagMap& input_tag_map);

  void AddPacketInfoInternal(const PacketId& packet_id,
                             int64_t production_time_usec,
                             int64_t source_process_start_usec);
  // Adds packet info for non-empty output packets.
  void AddPacketInfoForOutputPackets(
      const OutputStreamShardSet& output_stream_shard_set,
      int64_t production_time_usec, int64_t source_process_start_usec);

  // Updates the production time for outputs and the stream profile for inputs.
  int64_t AddStreamLatencies(const CalculatorContext& calculator_context,
                             int64_t start_time_usec, int64_t end_time_usec,
                             CalculatorProfile* calculator_profile);

  void SetOpenRuntime(const CalculatorContext& calculator_context,
                      int64_t start_time_usec, int64_t end_time_usec)
      ABSL_LOCKS_EXCLUDED(profiler_mutex_);
  void SetCloseRuntime(const CalculatorContext& calculator_context,
                       int64_t start_time_usec, int64_t end_time_usec)
      ABSL_LOCKS_EXCLUDED(profiler_mutex_);

  // Updates the input streams profiles for the calculator and returns the
  // minimum |source_process_start_usec| of all input packets, excluding empty
  // packets and back-edge packets. Returns -1 if there is no input packets.
  int64_t AddInputStreamTimeSamples(const CalculatorContext& calculator_context,
                                    int64_t start_time_usec,
                                    CalculatorProfile* calculator_profile);

  // Updates the Process() data for calculator.
  // Requires ReaderLock for is_profiling_.
  void AddProcessSample(const CalculatorContext& calculator_context,
                        int64_t start_time_usec, int64_t end_time_usec)
      ABSL_LOCKS_EXCLUDED(profiler_mutex_);

  // Helper method to get trace_log_path.  If the trace_log_path is empty and
  // tracing is enabled, this function returns a default platform dependent
  // trace_log_path.
  absl::StatusOr<std::string> GetTraceLogPath();

  // Helper method to get the clock time in microsecond.
  int64_t TimeNowUsec() { return ToUnixMicros(clock_->TimeNow()); }

 private:
  // The settings for this tracer.
  ProfilerConfig profiler_config_;

  // If true, the profiler has already been initialized and should not be
  // initialized again.
  std::atomic_bool is_initialized_;

  // If true, the profiler is profiling. Otherwise, it is paused.
  std::atomic_bool is_profiling_;

  // If true, the tracer records timing events.
  std::atomic_bool is_tracing_;

  // Stores all the calculator profiles with the calculator name as the key.
  using CalculatorProfileMap = ShardedMap<std::string, CalculatorProfile>;
  CalculatorProfileMap calculator_profiles_;
  // Stores the production time of a packet, based on profiler's clock.
  using PacketInfoMap =
      ShardedMap<std::string, std::list<std::pair<int64_t, PacketInfo>>>;
  PacketInfoMap packets_info_;

  // Global mutex for the profiler.
  mutable absl::Mutex profiler_mutex_;

  // Buffer of recent profile trace events.
  std::unique_ptr<GraphTracer> packet_tracer_;

  // The clock for time measurement, which must be a monotonic real time clock.
  std::shared_ptr<mediapipe::Clock> clock_;

  // Inidicates that profiling has started and not yet stopped.
  std::atomic_bool is_running_;

  // The end time of the previous output log.
  absl::Time previous_log_end_time_;

  // The index number of the previous output log.
  std::atomic<int> previous_log_index_;

  // The configuration for the graph being profiled.
  const ValidatedGraphConfig* validated_graph_;

  // A private resource for creating GraphProfiles.
  class GraphProfileBuilder;
  std::unique_ptr<GraphProfileBuilder> profile_builder_;

  // The globally incrementing identifier for all graphs in a process.
  static inline std::atomic_int next_instance_id_ = 0;

  // A unique identifier for this object. Only unique within a process.
  uint64_t graph_id_;

  // For testing.
  friend GraphProfilerTestPeer;
};

// The API class used to access the preferred profiler, such as
// GraphProfiler or GraphProfilerStub.  ProfilingContext is defined as
// a class rather than a typedef in order to support clients that refer
// to it only as a forward declaration, such as CalculatorState.
class ProfilingContext : public GraphProfiler {
  using GraphProfiler::GraphProfiler;
};

// For now, OSS always uses GlContextProfilerStub.
// TODO: Switch to GlContextProfiler when GlContext is moved to OSS.
#define MEDIAPIPE_DISABLE_GPU_PROFILER 1

// GlContextProfiler keeps track of all timestamp queries within a specific
// GlContext object. When created, the GlContextProfiler must be initialized
// before marking timestamps. Finally, when GlContext is no longer interested
// in marking timestamps or is about to be destroyed, Finish() must be called
// to complete all pending time queries and detach the timer from the GlContext.
// Note that the GlContextProfiler must be created and initialized within a
// valid GlContext object.
#if !MEDIAPIPE_DISABLE_GPU_PROFILER
class GlContextProfiler {
 public:
  explicit GlContextProfiler(
      std::shared_ptr<ProfilingContext> profiling_context)
      : profiling_context_(profiling_context) {}

  // Not copyable or movable.
  GlContextProfiler(const GlContextProfiler&) = delete;
  GlContextProfiler& operator=(const GlContextProfiler&) = delete;

  // Add a GlTimingInfo object to the collection of pending timestamp queries
  // associated with a specific graph node_id, packet input_timestamp and mark
  // if it is a start or stop event. When a stop event is marked, this function
  // blocks on the corresponding start event to complete.
  void MarkTimestamp(int node_id, Timestamp input_timestamp, bool is_finish);

  // Complete all pending timing queries and detach the timer from the
  // GlContext.
  void LogAllTimestamps();

 private:
  // Store GlTimeQuery and the corresponding TraceEvent object that should be
  // populated when the query completes together.
  struct GlTimingInfo {
    GlTimeQuery time_query;
    TraceEvent trace_event;
  };

  // Setup the timer for marking GPU timestamps. If successful in setup, return
  // true otherwise return false to indicate that timing measurment is not
  // supported.
  bool Initialize();

  absl::Time TimeNow();

  // Calibrate the GPU timer w.r.t. the CPU clock. If calibration is fails,
  // timing_measurement_supported_ is set to false.
  void CalibrateTimer(bool recalibrate);

  // Log a TraceEvent object to represent if the GPU calibration period has
  // started or just ended.
  void LogCalibrationEvent(bool started, absl::Time time);

  // Log TraceEvent objects for completed time queries. If the parameter wait is
  // set to true, wait for all time queries to complete before returning.
  void RetireReadyGlTimings(bool wait = false);

  // Get the TraceEvent object containing the timestamp recorded by the GPU if
  // the provided query was fulfilled. If it is still pending and wait is false,
  // return absl::nullopt.
  absl::optional<TraceEvent> GetTimeFromQuery(
      std::unique_ptr<GlTimingInfo>& query, bool wait);

  std::shared_ptr<ProfilingContext> profiling_context_;
  GlSimpleTimer gl_timer_;
  bool checked_timing_supported_ = false;
  bool timing_measurement_supported_ = false;
  std::deque<std::unique_ptr<GlTimingInfo>> pending_gl_times_;
  std::unique_ptr<GlTimingInfo> gl_start_query_;
};

// The API class used to access the preferred GlContext profiler, such as
// GlContextProfiler or GlContextProfilerStub. GlProfilingHelper is defined as
// a class rather than a typedef in order to support clients that refer
// to it only as a forward declaration.
class GlProfilingHelper : public GlContextProfiler {
  using GlContextProfiler::GlContextProfiler;
};
#else   // MEDIAPIPE_DISABLE_GPU_PROFILER
class GlContextProfilerStub {
 public:
  explicit GlContextProfilerStub(
      std::shared_ptr<ProfilingContext> profiling_context) {}
  // Not copyable or movable.
  GlContextProfilerStub(const GlContextProfilerStub&) = delete;
  GlContextProfilerStub& operator=(const GlContextProfilerStub&) = delete;
  bool Initialze() { return false; }
  void MarkTimestamp(int node_id, Timestamp input_timestamp, bool is_finish) {}
  void LogAllTimestamps() {}
};
class GlProfilingHelper : public GlContextProfilerStub {
  using GlContextProfilerStub::GlContextProfilerStub;
};
#endif  // !MEDIAPIPE_DISABLE_GPU_PROFILER
#undef MEDIAPIPE_DISABLE_GPU_PROFILER
}  // namespace mediapipe

#endif  // MEDIAPIPE_FRAMEWORK_PROFILER_GRAPH_PROFILER_H_