// Copyright 2019 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "mediapipe/framework/profiler/graph_profiler.h"
#include <fstream>
#include <list>
#include <memory>
#include "absl/log/absl_check.h"
#include "absl/log/absl_log.h"
#include "absl/strings/substitute.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "mediapipe/framework/port/advanced_proto_lite_inc.h"
#include "mediapipe/framework/port/canonical_errors.h"
#include "mediapipe/framework/port/file_helpers.h"
#include "mediapipe/framework/port/proto_ns.h"
#include "mediapipe/framework/port/re2.h"
#include "mediapipe/framework/port/ret_check.h"
#include "mediapipe/framework/port/status.h"
#include "mediapipe/framework/profiler/profiler_resource_util.h"
#include "mediapipe/framework/tool/name_util.h"
#include "mediapipe/framework/tool/tag_map.h"
#include "mediapipe/framework/tool/validate_name.h"
namespace mediapipe {
using tool::TagMap;
namespace {
const int kDefaultLogIntervalCount = 10;
const int kDefaultLogFileCount = 2;
const char kDefaultLogFilePrefix[] = "mediapipe_trace_";
// The number of recent timestamps tracked for each input stream.
const int kPacketInfoRecentCount = 400;
std::string PacketIdToString(const PacketId& packet_id) {
return absl::Substitute("stream_name: $0, timestamp_usec: $1",
packet_id.stream_name, packet_id.timestamp_usec);
}
int GetLogIntervalCount(const ProfilerConfig& profiler_config) {
return profiler_config.trace_log_interval_count()
? profiler_config.trace_log_interval_count()
: kDefaultLogIntervalCount;
}
int GetLogFileCount(const ProfilerConfig& profiler_config) {
return profiler_config.trace_log_count() ? profiler_config.trace_log_count()
: kDefaultLogFileCount;
}
// Returns true if aggregate timing data is recorded.
bool IsProfilerEnabled(const ProfilerConfig& profiler_config) {
return profiler_config.enable_profiler();
}
// Returns true if trace events are recorded.
bool IsTracerEnabled(const ProfilerConfig& profiler_config) {
return profiler_config.trace_enabled();
}
// Returns true if trace events are written to a log file.
// Note that for now, file output is only for graph-trace and not for
// calculator-profile.
bool IsTraceLogEnabled(const ProfilerConfig& profiler_config) {
return IsTracerEnabled(profiler_config) &&
!profiler_config.trace_log_disabled();
}
// Returns true if trace events are written periodically.
bool IsTraceIntervalEnabled(const ProfilerConfig& profiler_config,
GraphTracer* tracer) {
return IsTraceLogEnabled(profiler_config) && tracer &&
absl::ToInt64Microseconds(tracer->GetTraceLogInterval()) != -1;
}
using PacketInfoMap =
ShardedMap<std::string, std::list<std::pair<int64_t, PacketInfo>>>;
// Inserts a PacketInfo into a PacketInfoMap.
void InsertPacketInfo(PacketInfoMap* map, const PacketId& packet_id,
const PacketInfo& packet_info) {
auto entry = map->find(packet_id.stream_name);
if (entry == map->end()) {
entry = map->insert({packet_id.stream_name, {}}).first;
}
auto& list = entry->second;
list.push_back({packet_id.timestamp_usec, packet_info});
while (list.size() > kPacketInfoRecentCount) {
list.pop_front();
}
}
// Returns a PacketInfo from a PacketInfoMap.
PacketInfo* GetPacketInfo(PacketInfoMap* map, const PacketId& packet_id) {
auto entry = map->find(packet_id.stream_name);
if (entry == map->end()) {
return nullptr;
}
auto& list = entry->second;
for (auto iter = list.rbegin(); iter != list.rend(); ++iter) {
if (iter->first == packet_id.timestamp_usec) {
return &iter->second;
}
}
return nullptr;
}
} // namespace
// Builds GraphProfile records from profiler timing data.
class GraphProfiler::GraphProfileBuilder {
public:
GraphProfileBuilder(GraphProfiler* profiler)
: profiler_(profiler), calculator_regex_(".*") {
auto& filter = profiler_->profiler_config().calculator_filter();
calculator_regex_ = filter.empty() ? calculator_regex_ : RE2(filter);
}
bool ProfileIncluded(const CalculatorProfile& p) {
return RE2::FullMatch(p.name(), calculator_regex_);
}
private:
GraphProfiler* profiler_;
RE2 calculator_regex_;
};
GraphProfiler::GraphProfiler()
: is_initialized_(false),
is_profiling_(false),
calculator_profiles_(1000),
packets_info_(1000),
is_running_(false),
previous_log_end_time_(absl::InfinitePast()),
previous_log_index_(-1),
validated_graph_(nullptr) {
clock_ = std::shared_ptr<mediapipe::Clock>(
mediapipe::MonotonicClock::CreateSynchronizedMonotonicClock());
}
GraphProfiler::~GraphProfiler() {}
void GraphProfiler::Initialize(
const ValidatedGraphConfig& validated_graph_config) {
absl::WriterMutexLock lock(&profiler_mutex_);
validated_graph_ = &validated_graph_config;
ABSL_CHECK(!is_initialized_)
<< "Cannot initialize the profiler for the same graph multiple times.";
profiler_config_ = validated_graph_config.Config().profiler_config();
int64_t interval_size_usec = profiler_config_.histogram_interval_size_usec();
interval_size_usec = interval_size_usec ? interval_size_usec : 1000000;
int64_t num_intervals = profiler_config_.num_histogram_intervals();
num_intervals = num_intervals ? num_intervals : 1;
if (IsTracerEnabled(profiler_config_)) {
packet_tracer_ = absl::make_unique<GraphTracer>(profiler_config_);
}
for (int node_id = 0;
node_id < validated_graph_config.CalculatorInfos().size(); ++node_id) {
std::string node_name =
tool::CanonicalNodeName(validated_graph_config.Config(), node_id);
CalculatorProfile profile;
profile.set_name(node_name);
InitializeTimeHistogram(interval_size_usec, num_intervals,
profile.mutable_process_runtime());
if (profiler_config_.enable_stream_latency()) {
InitializeTimeHistogram(interval_size_usec, num_intervals,
profile.mutable_process_input_latency());
InitializeTimeHistogram(interval_size_usec, num_intervals,
profile.mutable_process_output_latency());
const CalculatorGraphConfig::Node& node_config =
validated_graph_config.Config().node(node_id);
InitializeOutputStreams(node_config);
InitializeInputStreams(node_config, interval_size_usec, num_intervals,
&profile);
}
auto iter = calculator_profiles_.insert({node_name, profile});
ABSL_CHECK(iter.second) << absl::Substitute(
"Calculator \"$0\" has already been added.", node_name);
}
profile_builder_ = std::make_unique<GraphProfileBuilder>(this);
graph_id_ = ++next_instance_id_;
is_initialized_ = true;
}
void GraphProfiler::SetClock(const std::shared_ptr<mediapipe::Clock>& clock) {
absl::WriterMutexLock lock(&profiler_mutex_);
ABSL_CHECK(clock) << "GraphProfiler::SetClock() is called with a nullptr.";
clock_ = clock;
}
const std::shared_ptr<mediapipe::Clock> GraphProfiler::GetClock() const {
return clock_;
}
void GraphProfiler::Pause() {
is_profiling_ = false;
is_tracing_ = false;
}
void GraphProfiler::Resume() {
// is_profiling_ enables recording of performance stats.
// is_tracing_ enables recording of timing events.
// While the graph is running, these variables indicate
// IsProfilerEnabled and IsTracerEnabled.
is_profiling_ = IsProfilerEnabled(profiler_config_);
is_tracing_ = IsTracerEnabled(profiler_config_);
}
void GraphProfiler::Reset() {
absl::WriterMutexLock lock(&profiler_mutex_);
for (auto iter = calculator_profiles_.begin();
iter != calculator_profiles_.end(); ++iter) {
CalculatorProfile* calculator_profile = &iter->second;
ResetTimeHistogram(calculator_profile->mutable_process_runtime());
ResetTimeHistogram(calculator_profile->mutable_process_input_latency());
ResetTimeHistogram(calculator_profile->mutable_process_output_latency());
for (auto& input_stream_profile :
*(calculator_profile->mutable_input_stream_profiles())) {
ResetTimeHistogram(input_stream_profile.mutable_latency());
}
}
}
// Begins profiling for a single graph run.
absl::Status GraphProfiler::Start(mediapipe::Executor* executor) {
// If specified, start periodic profile output while the graph runs.
Resume();
if (is_tracing_ && IsTraceIntervalEnabled(profiler_config_, tracer()) &&
executor != nullptr) {
// Inform the user via logging the path to the trace logs.
MP_ASSIGN_OR_RETURN(std::string trace_log_path, GetTraceLogPath());
// Check that we can actually write to it.
auto status =
file::SetContents(absl::StrCat(trace_log_path, "trace_writing_check"),
"can write trace logs to this location");
if (status.ok()) {
ABSL_LOG(INFO) << "trace_log_path: " << trace_log_path;
} else {
ABSL_LOG(ERROR) << "cannot write to trace_log_path: " << trace_log_path
<< ": " << status;
}
is_running_ = true;
std::weak_ptr<GraphProfiler> weak = weak_from_this();
executor->Schedule([weak] {
std::shared_ptr<GraphProfiler> self = weak.lock();
if (!self) {
return;
}
absl::Time deadline =
self->clock_->TimeNow() + self->tracer()->GetTraceLogInterval();
while (self->is_running_) {
self->clock_->SleepUntil(deadline);
deadline =
self->clock_->TimeNow() + self->tracer()->GetTraceLogInterval();
if (self->is_running_) {
self->WriteProfile().IgnoreError();
}
}
});
}
return absl::OkStatus();
}
// Ends profiling for a single graph run.
absl::Status GraphProfiler::Stop() {
is_running_ = false;
Pause();
// If specified, write a final profile.
if (IsTraceLogEnabled(profiler_config_)) {
MP_RETURN_IF_ERROR(WriteProfile());
}
return absl::OkStatus();
}
void GraphProfiler::LogEvent(const TraceEvent& event) {
// Record event info in the event trace log.
if (packet_tracer_) {
if (event.event_type == GraphTrace::GPU_TASK ||
event.event_type == GraphTrace::GPU_CALIBRATION) {
packet_tracer_->LogEvent(event);
} else {
absl::Time time_now = clock_->TimeNow();
packet_tracer_->LogEvent(TraceEvent(event).set_event_time(time_now));
}
}
// Record event info in the profiling histograms.
if (event.event_type == GraphTrace::PROCESS && event.node_id == -1) {
AddPacketInfo(event);
}
}
void GraphProfiler::AddPacketInfo(const TraceEvent& packet_info) {
absl::ReaderMutexLock lock(&profiler_mutex_);
if (!is_profiling_) {
return;
}
Timestamp packet_timestamp = packet_info.input_ts;
std::string stream_name = *packet_info.stream_id;
if (!profiler_config_.enable_stream_latency()) {
return;
}
if (!packet_timestamp.IsRangeValue()) {
ABSL_LOG(WARNING) << absl::Substitute(
"Skipped adding packet info because the timestamp $0 for stream "
"\"$1\" is not valid.",
packet_timestamp.Value(), stream_name);
return;
}
int64_t production_time_usec =
profiler_config_.use_packet_timestamp_for_added_packet()
? packet_timestamp.Value()
: TimeNowUsec();
AddPacketInfoInternal(PacketId({stream_name, packet_timestamp.Value()}),
production_time_usec, production_time_usec);
}
absl::Status GraphProfiler::GetCalculatorProfiles(
std::vector<CalculatorProfile>* profiles) const {
absl::ReaderMutexLock lock(&profiler_mutex_);
RET_CHECK(is_initialized_)
<< "GetCalculatorProfiles can only be called after Initialize()";
for (auto& entry : calculator_profiles_) {
profiles->push_back(entry.second);
}
return absl::OkStatus();
}
void GraphProfiler::InitializeTimeHistogram(int64_t interval_size_usec,
int64_t num_intervals,
TimeHistogram* histogram) {
histogram->set_interval_size_usec(interval_size_usec);
histogram->set_num_intervals(num_intervals);
histogram->mutable_count()->Resize(num_intervals, /*value=*/0);
ResetTimeHistogram(histogram);
}
void GraphProfiler::InitializeOutputStreams(
const CalculatorGraphConfig::Node& node_config) {}
void GraphProfiler::InitializeInputStreams(
const CalculatorGraphConfig::Node& node_config, int64_t interval_size_usec,
int64_t num_intervals, CalculatorProfile* calculator_profile) {
std::shared_ptr<tool::TagMap> input_tag_map =
TagMap::Create(node_config.input_stream()).value();
std::set<int> back_edge_ids = GetBackEdgeIds(node_config, *input_tag_map);
auto input_tag_map_names = input_tag_map->Names();
for (int i = 0; i < input_tag_map_names.size(); ++i) {
std::string input_stream_name = input_tag_map_names[i];
StreamProfile* input_stream_profile =
calculator_profile->add_input_stream_profiles();
input_stream_profile->set_name(input_stream_name);
input_stream_profile->set_back_edge(back_edge_ids.find(i) !=
back_edge_ids.end());
InitializeTimeHistogram(interval_size_usec, num_intervals,
input_stream_profile->mutable_latency());
}
}
std::set<int> GraphProfiler::GetBackEdgeIds(
const CalculatorGraphConfig::Node& node_config,
const TagMap& input_tag_map) {
std::set<int> back_edge_ids;
for (const auto& input_stream_info : node_config.input_stream_info()) {
if (!input_stream_info.back_edge()) {
continue;
}
std::string tag;
int index;
MEDIAPIPE_CHECK_OK(
tool::ParseTagIndex(input_stream_info.tag_index(), &tag, &index))
<< absl::Substitute("Cannot parse TAG or index for the backedge \"$0\"",
input_stream_info.tag_index());
ABSL_CHECK(0 <= index && index < input_tag_map.NumEntries(tag))
<< absl::Substitute(
"The input_stream_info for tag \"$0\" (index "
"$1) does not match any input_stream.",
tag, index);
back_edge_ids.insert(input_tag_map.GetId(tag, index).value());
}
return back_edge_ids;
}
void GraphProfiler::ResetTimeHistogram(TimeHistogram* histogram) {
histogram->set_total(0);
for (auto& count : *(histogram->mutable_count())) {
count = 0;
}
}
void GraphProfiler::AddPacketInfoInternal(const PacketId& packet_id,
int64_t production_time_usec,
int64_t source_process_start_usec) {
PacketInfo packet_info = {0, production_time_usec, source_process_start_usec};
InsertPacketInfo(&packets_info_, packet_id, packet_info);
}
void GraphProfiler::AddPacketInfoForOutputPackets(
const OutputStreamShardSet& output_stream_shard_set,
int64_t production_time_usec, int64_t source_process_start_usec) {
for (const OutputStreamShard& output_stream_shard : output_stream_shard_set) {
for (const Packet& output_packet : *output_stream_shard.OutputQueue()) {
AddPacketInfoInternal(PacketId({output_stream_shard.Name(),
output_packet.Timestamp().Value()}),
production_time_usec, source_process_start_usec);
}
}
}
int64_t GraphProfiler::AddStreamLatencies(
const CalculatorContext& calculator_context, int64_t start_time_usec,
int64_t end_time_usec, CalculatorProfile* calculator_profile) {
// Update input streams profiles.
int64_t min_source_process_start_usec = AddInputStreamTimeSamples(
calculator_context, start_time_usec, calculator_profile);
// Update output production times.
AddPacketInfoForOutputPackets(calculator_context.Outputs(), end_time_usec,
min_source_process_start_usec);
return min_source_process_start_usec;
}
void GraphProfiler::SetOpenRuntime(const CalculatorContext& calculator_context,
int64_t start_time_usec,
int64_t end_time_usec) {
absl::ReaderMutexLock lock(&profiler_mutex_);
if (!is_profiling_) {
return;
}
const std::string& node_name = calculator_context.NodeName();
int64_t time_usec = end_time_usec - start_time_usec;
auto profile_iter = calculator_profiles_.find(node_name);
ABSL_CHECK(profile_iter != calculator_profiles_.end()) << absl::Substitute(
"Calculator \"$0\" has not been added during initialization.",
calculator_context.NodeName());
CalculatorProfile* calculator_profile = &profile_iter->second;
calculator_profile->set_open_runtime(time_usec);
if (profiler_config_.enable_stream_latency()) {
AddStreamLatencies(calculator_context, start_time_usec, end_time_usec,
calculator_profile);
}
}
void GraphProfiler::SetCloseRuntime(const CalculatorContext& calculator_context,
int64_t start_time_usec,
int64_t end_time_usec) {
absl::ReaderMutexLock lock(&profiler_mutex_);
if (!is_profiling_) {
return;
}
const std::string& node_name = calculator_context.NodeName();
int64_t time_usec = end_time_usec - start_time_usec;
auto profile_iter = calculator_profiles_.find(node_name);
ABSL_CHECK(profile_iter != calculator_profiles_.end()) << absl::Substitute(
"Calculator \"$0\" has not been added during initialization.",
calculator_context.NodeName());
CalculatorProfile* calculator_profile = &profile_iter->second;
calculator_profile->set_close_runtime(time_usec);
if (profiler_config_.enable_stream_latency()) {
AddStreamLatencies(calculator_context, start_time_usec, end_time_usec,
calculator_profile);
}
}
void GraphProfiler::AddTimeSample(int64_t start_time_usec,
int64_t end_time_usec,
TimeHistogram* histogram) {
if (end_time_usec < start_time_usec) {
ABSL_LOG(ERROR) << absl::Substitute(
"end_time_usec ($0) is < start_time_usec ($1)", end_time_usec,
start_time_usec);
return;
}
int64_t time_usec = end_time_usec - start_time_usec;
histogram->set_total(histogram->total() + time_usec);
int64_t interval_index = time_usec / histogram->interval_size_usec();
if (interval_index > histogram->num_intervals() - 1) {
interval_index = histogram->num_intervals() - 1;
}
histogram->set_count(interval_index, histogram->count(interval_index) + 1);
}
int64_t GraphProfiler::AddInputStreamTimeSamples(
const CalculatorContext& calculator_context, int64_t start_time_usec,
CalculatorProfile* calculator_profile) {
int64_t input_timestamp_usec = calculator_context.InputTimestamp().Value();
int64_t min_source_process_start_usec = start_time_usec;
int64_t input_stream_counter = -1;
for (CollectionItemId id = calculator_context.Inputs().BeginId();
id < calculator_context.Inputs().EndId(); ++id) {
++input_stream_counter;
if (calculator_context.Inputs().Get(id).Value().IsEmpty() ||
calculator_profile->input_stream_profiles(input_stream_counter)
.back_edge()) {
continue;
}
PacketId packet_id = {calculator_context.Inputs().Get(id).Name(),
input_timestamp_usec};
PacketInfo* packet_info = GetPacketInfo(&packets_info_, packet_id);
if (packet_info == nullptr) {
// This is a condition rather than a failure CHECK because
// under certain conditions the consumer calculator's Process()
// can start before the producer calculator's Process() is finished.
ABSL_LOG_FIRST_N(WARNING, 10) << "Expected packet info is missing for: "
<< PacketIdToString(packet_id);
continue;
}
AddTimeSample(
packet_info->production_time_usec, start_time_usec,
calculator_profile->mutable_input_stream_profiles(input_stream_counter)
->mutable_latency());
min_source_process_start_usec = std::min(
min_source_process_start_usec, packet_info->source_process_start_usec);
}
return min_source_process_start_usec;
}
void GraphProfiler::AddProcessSample(
const CalculatorContext& calculator_context, int64_t start_time_usec,
int64_t end_time_usec) {
absl::ReaderMutexLock lock(&profiler_mutex_);
if (!is_profiling_) {
return;
}
const std::string& node_name = calculator_context.NodeName();
auto profile_iter = calculator_profiles_.find(node_name);
ABSL_CHECK(profile_iter != calculator_profiles_.end()) << absl::Substitute(
"Calculator \"$0\" has not been added during initialization.",
calculator_context.NodeName());
CalculatorProfile* calculator_profile = &profile_iter->second;
// Update Process() runtime.
AddTimeSample(start_time_usec, end_time_usec,
calculator_profile->mutable_process_runtime());
if (profiler_config_.enable_stream_latency()) {
int64_t min_source_process_start_usec = AddStreamLatencies(
calculator_context, start_time_usec, end_time_usec, calculator_profile);
// Update input and output trace latencies.
AddTimeSample(min_source_process_start_usec, start_time_usec,
calculator_profile->mutable_process_input_latency());
AddTimeSample(min_source_process_start_usec, end_time_usec,
calculator_profile->mutable_process_output_latency());
}
}
std::unique_ptr<GlProfilingHelper> GraphProfiler::CreateGlProfilingHelper() {
if (!IsTracerEnabled(profiler_config_)) {
return nullptr;
}
return absl::make_unique<mediapipe::GlProfilingHelper>(shared_from_this());
}
// A simple ZeroCopyOutputStream that writes to a std::ostream.
class OstreamStream : public proto_ns::io::ZeroCopyOutputStream {
public:
explicit OstreamStream(std::ostream* output)
: output_(output), buffer_used_(0), position_(0) {}
~OstreamStream() override { WriteBuffer(); }
bool Next(void** data, int* size) override {
*data = buffer_;
*size = sizeof(buffer_);
return WriteBuffer();
}
void BackUp(int count) override { buffer_used_ -= count; }
int64_t ByteCount() const override { return position_; }
private:
// Writes the buffer to the ostream.
bool WriteBuffer() {
output_->write(buffer_, buffer_used_);
position_ += buffer_used_;
buffer_used_ = sizeof(buffer_);
return output_->good();
}
std::ostream* output_;
char buffer_[1024];
int buffer_used_;
int position_;
OstreamStream(const OstreamStream&) = delete;
OstreamStream& operator=(const OstreamStream&) = delete;
};
// Sets the canonical node name in each CalculatorGraphConfig::Node
// and also in the GraphTrace if present.
void AssignNodeNames(GraphProfile* profile) {
CalculatorGraphConfig* graph_config = profile->mutable_config();
GraphTrace* graph_trace = profile->graph_trace_size() > 0
? profile->mutable_graph_trace(0)
: nullptr;
if (graph_trace) {
graph_trace->clear_calculator_name();
}
std::vector<std::string> canonical_names;
canonical_names.reserve(graph_config->node().size());
for (int i = 0; i < graph_config->node().size(); ++i) {
canonical_names.push_back(CanonicalNodeName(*graph_config, i));
}
for (int i = 0; i < graph_config->node().size(); ++i) {
graph_config->mutable_node(i)->set_name(canonical_names[i]);
}
if (graph_trace) {
graph_trace->mutable_calculator_name()->Assign(canonical_names.begin(),
canonical_names.end());
}
}
// Clears fields containing their default values.
void CleanTimeHistogram(TimeHistogram* histogram) {
if (histogram->num_intervals() == 1) {
histogram->clear_num_intervals();
}
if (histogram->interval_size_usec() == 1000000) {
histogram->clear_interval_size_usec();
}
}
// Clears fields containing their default values.
void CleanCalculatorProfiles(GraphProfile* profile) {
for (CalculatorProfile& p : *profile->mutable_calculator_profiles()) {
CleanTimeHistogram(p.mutable_process_runtime());
CleanTimeHistogram(p.mutable_process_input_latency());
CleanTimeHistogram(p.mutable_process_output_latency());
for (StreamProfile& s : *p.mutable_input_stream_profiles()) {
CleanTimeHistogram(s.mutable_latency());
}
}
}
absl::StatusOr<std::string> GraphProfiler::GetTraceLogPath() {
if (!IsTraceLogEnabled(profiler_config_)) {
return absl::InternalError(
"Trace log writing is disabled, unable to get trace_log_path.");
}
if (profiler_config_.trace_log_path().empty()) {
MP_ASSIGN_OR_RETURN(std::string directory_path,
GetDefaultTraceLogDirectory());
std::string trace_log_path =
absl::StrCat(directory_path, "/", kDefaultLogFilePrefix);
return trace_log_path;
} else {
return profiler_config_.trace_log_path();
}
}
absl::Status GraphProfiler::CaptureProfile(
GraphProfile* result, PopulateGraphConfig populate_config) {
// Record the GraphTrace events since the previous WriteProfile.
// The end_time is chosen to be trace_log_margin_usec in the past,
// providing time for events to be appended to the TraceBuffer.
absl::Time end_time =
clock_->TimeNow() -
absl::Microseconds(profiler_config_.trace_log_margin_usec());
if (tracer()) {
GraphTrace* trace = result->add_graph_trace();
if (!profiler_config_.trace_log_instant_events()) {
tracer()->GetTrace(previous_log_end_time_, end_time, trace);
} else {
tracer()->GetLog(previous_log_end_time_, end_time, trace);
}
}
previous_log_end_time_ = end_time;
// Record the latest CalculatorProfiles.
Status status;
std::vector<CalculatorProfile> profiles;
status.Update(GetCalculatorProfiles(&profiles));
for (CalculatorProfile& p : profiles) {
if (profile_builder_->ProfileIncluded(p)) {
*result->mutable_calculator_profiles()->Add() = std::move(p);
}
}
this->Reset();
CleanCalculatorProfiles(result);
if (populate_config == PopulateGraphConfig::kFull) {
*result->mutable_config() = validated_graph_->Config();
AssignNodeNames(result);
}
return status;
}
absl::Status GraphProfiler::WriteProfile() {
if (profiler_config_.trace_log_disabled()) {
// Logging is disabled, so we can exit writing without error.
return absl::OkStatus();
}
MP_ASSIGN_OR_RETURN(std::string trace_log_path, GetTraceLogPath());
int log_interval_count = GetLogIntervalCount(profiler_config_);
int log_file_count = GetLogFileCount(profiler_config_);
GraphProfile profile;
MP_RETURN_IF_ERROR(CaptureProfile(&profile, PopulateGraphConfig::kNo));
// If there are no trace events, skip log writing.
const GraphTrace& trace = *profile.graph_trace().rbegin();
if (is_tracing_ && trace.calculator_trace().empty()) {
return absl::OkStatus();
}
// Record the CalculatorGraphConfig, once per log file.
// Effective index should not change during the call, and thus should be
// stored in a local variable after the increment. Otherwise `is_new_file` and
// `log_index` may evaluate to incoherent values depending on spurious
// `previous_log_index_` increments by other threads.
int previous_log_index = ++previous_log_index_;
bool is_new_file = (previous_log_index % log_interval_count == 0);
if (is_new_file) {
*profile.mutable_config() = validated_graph_->Config();
AssignNodeNames(&profile);
}
// Write the GraphProfile to the trace_log_path.
int log_index = previous_log_index / log_interval_count % log_file_count;
std::string log_path = absl::StrCat(trace_log_path, log_index, ".binarypb");
std::ofstream ofs;
if (is_new_file) {
ofs.open(log_path, std::ofstream::out | std::ofstream::trunc);
} else {
ofs.open(log_path, std::ofstream::out | std::ofstream::app);
}
OstreamStream out(&ofs);
RET_CHECK(profile.SerializeToZeroCopyStream(&out))
<< "Could not write binary GraphProfile to: " << log_path;
return absl::OkStatus();
}
} // namespace mediapipe