// Copyright 2019 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/log/absl_log.h"
#include "absl/strings/str_cat.h"
#include "absl/time/time.h"
#include "mediapipe/calculators/util/latency.pb.h"
#include "mediapipe/calculators/util/packet_latency_calculator.pb.h"
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/calculator_options.pb.h"
#include "mediapipe/framework/deps/clock.h"
#include "mediapipe/framework/deps/monotonic_clock.h"
#include "mediapipe/framework/port/ret_check.h"
#include "mediapipe/framework/port/status.h"
#include "mediapipe/framework/timestamp.h"
namespace mediapipe {
namespace {
// Tag name for clock side packet.
constexpr char kClockTag[] = "CLOCK";
// Tag name for reference signal.
constexpr char kReferenceSignalTag[] = "REFERENCE_SIGNAL";
} // namespace
// A MediaPipe calculator that computes latency of incoming packet streams with
// respect to a reference signal (e.g image, audio frames).
//
// The latency of a packet wrt a reference packet is defined as the difference
// between arrival times of the two. A latency of X microseconds implies that
// the packet arrived X microseconds after its corresponding reference packet.
// For each packet stream, the calculator outputs the current latency, average,
// and a histogram of observed latencies so far.
//
// NOTE:
// 1) This calculator is meant to be used ONLY with an
// ImmediateInputStreamHandler.
// 2) This calculator is meant to be used only for real-time or simulated real-
// time applications. For example, the reference signal could be audio/video
// frames coming from a calculator that reads microphone/webcam data or some
// calculator that simulates real-time input.
// 3) If the packet labels are provided through options, then the number of
// labels should be exactly same as number of output_streams. If no packet
// label is defined in the node options, the calculator uses the input stream
// names.
//
// InputSidePacket (Optional):
// CLOCK: A clock for knowing current time.
//
// Inputs:
// 0- Packet stream 0 (e.g image feature 0):
// 1- Packet stream 1 (e.g image features 1):
// ...
// N- Packet stream N (e.g image features N):
// REFERENCE_SIGNAL: The reference signal from which the above packets were
// extracted (e.g image frames).
//
// Outputs:
// 0- Latency of packet stream 0.
// 1- Latency of packet stream 1.
// ...
// N- Latency of packet stream N.
//
// Example config:
// node {
// calculator: "PacketLatencyCalculator"
// input_side_packet: "monotonic_clock"
// input_stream: "packet_stream_0"
// input_stream: "packet_stream_1"
// ...
// input_stream: "packet_stream_N"
// input_stream: "REFERENCE_SIGNAL:camera_frames"
// output_stream: "packet_latency_0"
// output_stream: "packet_latency_1"
// ...
// output_stream: "packet_latency_N"
// options {
// [soapbox.PacketLatencyCalculatorOptions.ext] {
// num_intervals: 10
// interval_size_usec: 10000
// }
// }
// input_stream_handler {
// input_stream_handler: 'ImmediateInputStreamHandler'
// }
// }
class PacketLatencyCalculator : public CalculatorBase {
public:
PacketLatencyCalculator() {}
static absl::Status GetContract(CalculatorContract* cc);
absl::Status Open(CalculatorContext* cc) override;
absl::Status Process(CalculatorContext* cc) override;
private:
// Resets the histogram and running average variables by initializing them to
// zero.
void ResetStatistics();
// Calculator options.
PacketLatencyCalculatorOptions options_;
// Clock object.
std::shared_ptr<::mediapipe::Clock> clock_;
// Clock time when the first reference packet was received.
int64_t first_process_time_usec_ = -1;
// Timestamp of the first reference packet received.
int64_t first_reference_timestamp_usec_ = -1;
// Number of packet streams.
int64_t num_packet_streams_ = -1;
// Latency output for each packet stream.
std::vector<PacketLatency> packet_latencies_;
// Running sum and count of latencies for each packet stream. This is required
// to compute the average latency.
std::vector<int64_t> sum_latencies_usec_;
std::vector<int64_t> num_latencies_;
// Clock time when last reset was done for histogram and running average.
int64_t last_reset_time_usec_ = -1;
};
REGISTER_CALCULATOR(PacketLatencyCalculator);
absl::Status PacketLatencyCalculator::GetContract(CalculatorContract* cc) {
RET_CHECK_GT(cc->Inputs().NumEntries(), 1);
// Input and output streams.
int64_t num_packet_streams = cc->Inputs().NumEntries() - 1;
RET_CHECK_EQ(cc->Outputs().NumEntries(), num_packet_streams);
for (int64_t i = 0; i < num_packet_streams; ++i) {
cc->Inputs().Index(i).SetAny();
cc->Outputs().Index(i).Set<PacketLatency>();
}
// Reference signal.
cc->Inputs().Tag(kReferenceSignalTag).SetAny();
// Clock side packet.
if (cc->InputSidePackets().HasTag(kClockTag)) {
cc->InputSidePackets()
.Tag(kClockTag)
.Set<std::shared_ptr<::mediapipe::Clock>>();
}
return absl::OkStatus();
}
void PacketLatencyCalculator::ResetStatistics() {
// Initialize histogram with zero counts and set running average to zero.
for (int64_t i = 0; i < num_packet_streams_; ++i) {
for (int64_t interval_index = 0; interval_index < options_.num_intervals();
++interval_index) {
packet_latencies_[i].set_counts(interval_index, 0);
}
// Initialize the running sum and count to 0.
sum_latencies_usec_[i] = 0;
num_latencies_[i] = 0;
}
}
absl::Status PacketLatencyCalculator::Open(CalculatorContext* cc) {
options_ = cc->Options<PacketLatencyCalculatorOptions>();
num_packet_streams_ = cc->Inputs().NumEntries() - 1;
// Check if provided labels are of correct size.
bool labels_provided = !options_.packet_labels().empty();
if (labels_provided) {
RET_CHECK_EQ(options_.packet_labels_size(), num_packet_streams_)
<< "Input packet stream count different from output stream count.";
}
// Check that histogram params are valid.
RET_CHECK_GT(options_.num_intervals(), 0);
RET_CHECK_GT(options_.interval_size_usec(), 0);
// Initialize latency outputs for all streams.
packet_latencies_.resize(num_packet_streams_);
sum_latencies_usec_.resize(num_packet_streams_);
num_latencies_.resize(num_packet_streams_);
for (int64_t i = 0; i < num_packet_streams_; ++i) {
// Initialize latency histograms with zero counts.
packet_latencies_[i].set_num_intervals(options_.num_intervals());
packet_latencies_[i].set_interval_size_usec(options_.interval_size_usec());
packet_latencies_[i].mutable_counts()->Resize(options_.num_intervals(), 0);
// Set the label for the stream. The packet labels are taken from options
// (if provided). If not, default labels are created using the input/output
// stream names.
if (labels_provided) {
packet_latencies_[i].set_label(options_.packet_labels(i));
} else {
int64_t input_stream_index = cc->Inputs().TagMap()->GetId("", i).value();
packet_latencies_[i].set_label(
cc->Inputs().TagMap()->Names()[input_stream_index]);
}
}
// Initialize the clock.
if (cc->InputSidePackets().HasTag(kClockTag)) {
clock_ = cc->InputSidePackets()
.Tag(kClockTag)
.Get<std::shared_ptr<::mediapipe::Clock>>();
} else {
clock_ = std::shared_ptr<::mediapipe::Clock>(
::mediapipe::MonotonicClock::CreateSynchronizedMonotonicClock());
}
return absl::OkStatus();
}
absl::Status PacketLatencyCalculator::Process(CalculatorContext* cc) {
// Record first process timestamp if this is the first call.
if (first_process_time_usec_ < 0 &&
!cc->Inputs().Tag(kReferenceSignalTag).IsEmpty()) {
first_process_time_usec_ = absl::ToUnixMicros(clock_->TimeNow());
first_reference_timestamp_usec_ = cc->InputTimestamp().Value();
last_reset_time_usec_ = first_process_time_usec_;
}
if (first_process_time_usec_ < 0) {
ABSL_LOG(WARNING) << "No reference packet received.";
return absl::OkStatus();
}
if (options_.reset_duration_usec() > 0) {
const int64_t time_now_usec = absl::ToUnixMicros(clock_->TimeNow());
if (time_now_usec - last_reset_time_usec_ >=
options_.reset_duration_usec()) {
ResetStatistics();
last_reset_time_usec_ = time_now_usec;
}
}
// Update latency info if there is any incoming packet.
for (int64_t i = 0; i < num_packet_streams_; ++i) {
if (!cc->Inputs().Index(i).IsEmpty()) {
const auto& packet_timestamp_usec = cc->InputTimestamp().Value();
// Update latency statistics for this stream.
int64_t current_clock_time_usec = absl::ToUnixMicros(clock_->TimeNow());
int64_t current_calibrated_timestamp_usec =
(current_clock_time_usec - first_process_time_usec_) +
first_reference_timestamp_usec_;
int64_t packet_latency_usec =
current_calibrated_timestamp_usec - packet_timestamp_usec;
// Invalid timestamps in input signals could result in negative latencies.
if (packet_latency_usec < 0) {
continue;
}
// Update the latency, running average and histogram for this stream.
packet_latencies_[i].set_current_latency_usec(packet_latency_usec);
int64_t interval_index =
packet_latency_usec / packet_latencies_[i].interval_size_usec();
if (interval_index >= packet_latencies_[i].num_intervals()) {
interval_index = packet_latencies_[i].num_intervals() - 1;
}
packet_latencies_[i].set_counts(
interval_index, packet_latencies_[i].counts(interval_index) + 1);
sum_latencies_usec_[i] += packet_latency_usec;
num_latencies_[i] += 1;
packet_latencies_[i].set_avg_latency_usec(sum_latencies_usec_[i] /
num_latencies_[i]);
packet_latencies_[i].set_sum_latency_usec(sum_latencies_usec_[i]);
// Push the latency packet to output.
auto packet_latency =
absl::make_unique<PacketLatency>(packet_latencies_[i]);
cc->Outputs().Index(i).Add(packet_latency.release(),
cc->InputTimestamp());
}
}
return absl::OkStatus();
}
} // namespace mediapipe