// Copyright 2019 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "mediapipe/calculators/core/packet_resampler_calculator.h"
#include <memory>
#include "absl/log/absl_check.h"
#include "absl/log/absl_log.h"
namespace {
// Reflect an integer against the lower and upper bound of an interval.
int64_t ReflectBetween(int64_t ts, int64_t ts_min, int64_t ts_max) {
if (ts < ts_min) return 2 * ts_min - ts - 1;
if (ts >= ts_max) return 2 * ts_max - ts - 1;
return ts;
}
// Creates a secure random number generator for use in ProcessWithJitter.
// If no secure random number generator can be constructed, the jitter
// option is disabled in order to mainatain a consistent security and
// consistent random seeding.
std::unique_ptr<RandomBase> CreateSecureRandom(const std::string& seed) {
RandomBase* result = nullptr;
return std::unique_ptr<RandomBase>(result);
}
} // namespace
namespace mediapipe {
REGISTER_CALCULATOR(PacketResamplerCalculator);
namespace {
constexpr char kSeedTag[] = "SEED";
constexpr char kVideoHeaderTag[] = "VIDEO_HEADER";
constexpr char kOptionsTag[] = "OPTIONS";
// Returns a TimestampDiff (assuming microseconds) corresponding to the
// given time in seconds.
TimestampDiff TimestampDiffFromSeconds(double seconds) {
return TimestampDiff(MathUtil::SafeRound<int64_t, double>(
seconds * Timestamp::kTimestampUnitsPerSecond));
}
} // namespace
absl::Status PacketResamplerCalculator::GetContract(CalculatorContract* cc) {
const auto& resampler_options =
cc->Options<PacketResamplerCalculatorOptions>();
if (cc->InputSidePackets().HasTag(kOptionsTag)) {
cc->InputSidePackets().Tag(kOptionsTag).Set<CalculatorOptions>();
}
CollectionItemId input_data_id = cc->Inputs().GetId("DATA", 0);
if (!input_data_id.IsValid()) {
input_data_id = cc->Inputs().GetId("", 0);
}
cc->Inputs().Get(input_data_id).SetAny();
if (cc->Inputs().HasTag(kVideoHeaderTag)) {
cc->Inputs().Tag(kVideoHeaderTag).Set<VideoHeader>();
}
CollectionItemId output_data_id = cc->Outputs().GetId("DATA", 0);
if (!output_data_id.IsValid()) {
output_data_id = cc->Outputs().GetId("", 0);
}
cc->Outputs().Get(output_data_id).SetSameAs(&cc->Inputs().Get(input_data_id));
if (cc->Outputs().HasTag(kVideoHeaderTag)) {
cc->Outputs().Tag(kVideoHeaderTag).Set<VideoHeader>();
}
if (resampler_options.jitter() != 0.0) {
RET_CHECK_GT(resampler_options.jitter(), 0.0);
RET_CHECK_LE(resampler_options.jitter(), 1.0);
RET_CHECK(cc->InputSidePackets().HasTag(kSeedTag));
cc->InputSidePackets().Tag(kSeedTag).Set<std::string>();
}
return absl::OkStatus();
}
absl::Status PacketResamplerCalculator::Open(CalculatorContext* cc) {
const auto resampler_options =
tool::RetrieveOptions(cc->Options<PacketResamplerCalculatorOptions>(),
cc->InputSidePackets(), "OPTIONS");
flush_last_packet_ = resampler_options.flush_last_packet();
jitter_ = resampler_options.jitter();
input_data_id_ = cc->Inputs().GetId("DATA", 0);
if (!input_data_id_.IsValid()) {
input_data_id_ = cc->Inputs().GetId("", 0);
}
output_data_id_ = cc->Outputs().GetId("DATA", 0);
if (!output_data_id_.IsValid()) {
output_data_id_ = cc->Outputs().GetId("", 0);
}
frame_rate_ = resampler_options.frame_rate();
start_time_ = resampler_options.has_start_time()
? Timestamp(resampler_options.start_time())
: Timestamp::Min();
end_time_ = resampler_options.has_end_time()
? Timestamp(resampler_options.end_time())
: Timestamp::Max();
round_limits_ = resampler_options.round_limits();
// The frame_rate has a default value of -1.0, so the user must set it!
RET_CHECK_LT(0, frame_rate_)
<< "The output frame rate must be greater than zero";
RET_CHECK_LE(frame_rate_, Timestamp::kTimestampUnitsPerSecond)
<< "The output frame rate must be smaller than "
<< Timestamp::kTimestampUnitsPerSecond;
frame_time_usec_ = static_cast<int64_t>(1000000.0 / frame_rate_);
jitter_usec_ = static_cast<int64_t>(1000000.0 * jitter_ / frame_rate_);
RET_CHECK_LE(jitter_usec_, frame_time_usec_);
video_header_.frame_rate = frame_rate_;
if (resampler_options.output_header() !=
PacketResamplerCalculatorOptions::NONE &&
!cc->Inputs().Get(input_data_id_).Header().IsEmpty()) {
if (resampler_options.output_header() ==
PacketResamplerCalculatorOptions::UPDATE_VIDEO_HEADER) {
video_header_ =
cc->Inputs().Get(input_data_id_).Header().Get<VideoHeader>();
video_header_.frame_rate = frame_rate_;
cc->Outputs()
.Get(output_data_id_)
.SetHeader(Adopt(new VideoHeader(video_header_)));
} else {
cc->Outputs()
.Get(output_data_id_)
.SetHeader(cc->Inputs().Get(input_data_id_).Header());
}
}
strategy_ = GetSamplingStrategy(resampler_options);
return strategy_->Open(cc);
}
absl::Status PacketResamplerCalculator::Process(CalculatorContext* cc) {
if (cc->InputTimestamp() == Timestamp::PreStream() &&
cc->Inputs().UsesTags() && cc->Inputs().HasTag(kVideoHeaderTag) &&
!cc->Inputs().Tag(kVideoHeaderTag).IsEmpty()) {
video_header_ = cc->Inputs().Tag(kVideoHeaderTag).Get<VideoHeader>();
video_header_.frame_rate = frame_rate_;
if (cc->Inputs().Get(input_data_id_).IsEmpty()) {
return absl::OkStatus();
}
}
MP_RETURN_IF_ERROR(strategy_->Process(cc));
last_packet_ = cc->Inputs().Get(input_data_id_).Value();
return absl::OkStatus();
}
absl::Status PacketResamplerCalculator::Close(CalculatorContext* cc) {
if (!cc->GraphStatus().ok()) {
return absl::OkStatus();
}
return strategy_->Close(cc);
}
std::unique_ptr<PacketResamplerStrategy>
PacketResamplerCalculator::GetSamplingStrategy(
const PacketResamplerCalculatorOptions& options) {
if (options.reproducible_sampling()) {
if (!options.jitter_with_reflection()) {
ABSL_LOG(WARNING)
<< "reproducible_sampling enabled w/ jitter_with_reflection "
"disabled. "
<< "reproducible_sampling always uses jitter with reflection, "
<< "Ignoring jitter_with_reflection setting.";
}
return absl::make_unique<ReproducibleJitterWithReflectionStrategy>(this);
}
if (options.jitter() == 0) {
return absl::make_unique<NoJitterStrategy>(this);
}
if (options.jitter_with_reflection()) {
return absl::make_unique<LegacyJitterWithReflectionStrategy>(this);
}
// With jitter and no reflection.
return absl::make_unique<JitterWithoutReflectionStrategy>(this);
}
Timestamp PacketResamplerCalculator::PeriodIndexToTimestamp(
int64_t index) const {
ABSL_CHECK_EQ(jitter_, 0.0);
ABSL_CHECK_NE(first_timestamp_, Timestamp::Unset());
return first_timestamp_ + TimestampDiffFromSeconds(index / frame_rate_);
}
int64_t PacketResamplerCalculator::TimestampToPeriodIndex(
Timestamp timestamp) const {
ABSL_CHECK_EQ(jitter_, 0.0);
ABSL_CHECK_NE(first_timestamp_, Timestamp::Unset());
return MathUtil::SafeRound<int64_t, double>(
(timestamp - first_timestamp_).Seconds() * frame_rate_);
}
void PacketResamplerCalculator::OutputWithinLimits(CalculatorContext* cc,
const Packet& packet) const {
TimestampDiff margin((round_limits_) ? frame_time_usec_ / 2 : 0);
if (packet.Timestamp() >= start_time_ - margin &&
packet.Timestamp() < end_time_ + margin) {
cc->Outputs().Get(output_data_id_).AddPacket(packet);
}
}
absl::Status LegacyJitterWithReflectionStrategy::Open(CalculatorContext* cc) {
const auto resampler_options =
tool::RetrieveOptions(cc->Options<PacketResamplerCalculatorOptions>(),
cc->InputSidePackets(), "OPTIONS");
if (resampler_options.output_header() !=
PacketResamplerCalculatorOptions::NONE) {
ABSL_LOG(WARNING)
<< "VideoHeader::frame_rate holds the target value and not "
"the actual value.";
}
if (calculator_->flush_last_packet_) {
ABSL_LOG(WARNING)
<< "PacketResamplerCalculatorOptions.flush_last_packet is "
"ignored, because we are adding jitter.";
}
const auto& seed = cc->InputSidePackets().Tag(kSeedTag).Get<std::string>();
random_ = CreateSecureRandom(seed);
if (random_ == nullptr) {
return absl::InvalidArgumentError(
"SecureRandom is not available. With \"jitter\" specified, "
"PacketResamplerCalculator processing cannot proceed.");
}
packet_reservoir_random_ = CreateSecureRandom(seed);
packet_reservoir_ =
std::make_unique<PacketReservoir>(packet_reservoir_random_.get());
return absl::OkStatus();
}
absl::Status LegacyJitterWithReflectionStrategy::Close(CalculatorContext* cc) {
if (!packet_reservoir_->IsEmpty()) {
ABSL_LOG(INFO) << "Emitting pack from reservoir.";
calculator_->OutputWithinLimits(cc, packet_reservoir_->GetSample());
}
return absl::OkStatus();
}
absl::Status LegacyJitterWithReflectionStrategy::Process(
CalculatorContext* cc) {
RET_CHECK_GT(cc->InputTimestamp(), Timestamp::PreStream());
if (packet_reservoir_->IsEnabled() &&
(first_timestamp_ == Timestamp::Unset() ||
(cc->InputTimestamp() - next_output_timestamp_min_).Value() >= 0)) {
auto curr_packet = cc->Inputs().Get(calculator_->input_data_id_).Value();
packet_reservoir_->AddSample(curr_packet);
}
if (first_timestamp_ == Timestamp::Unset()) {
first_timestamp_ = cc->InputTimestamp();
InitializeNextOutputTimestampWithJitter();
if (first_timestamp_ == next_output_timestamp_) {
calculator_->OutputWithinLimits(cc, cc->Inputs()
.Get(calculator_->input_data_id_)
.Value()
.At(next_output_timestamp_));
UpdateNextOutputTimestampWithJitter();
}
return absl::OkStatus();
}
if (calculator_->frame_time_usec_ <
(cc->InputTimestamp() - calculator_->last_packet_.Timestamp()).Value()) {
ABSL_LOG_FIRST_N(WARNING, 2)
<< "Adding jitter is not very useful when upsampling.";
}
while (true) {
const int64_t last_diff =
(next_output_timestamp_ - calculator_->last_packet_.Timestamp())
.Value();
RET_CHECK_GT(last_diff, 0);
const int64_t curr_diff =
(next_output_timestamp_ - cc->InputTimestamp()).Value();
if (curr_diff > 0) {
break;
}
calculator_->OutputWithinLimits(
cc, (std::abs(curr_diff) > last_diff
? calculator_->last_packet_
: cc->Inputs().Get(calculator_->input_data_id_).Value())
.At(next_output_timestamp_));
UpdateNextOutputTimestampWithJitter();
// From now on every time a packet is emitted the timestamp of the next
// packet becomes known; that timestamp is stored in next_output_timestamp_.
// The only exception to this rule is the packet emitted from Close() which
// can only happen when jitter_with_reflection is enabled but in this case
// next_output_timestamp_min_ is a non-decreasing lower bound of any
// subsequent packet.
const Timestamp timestamp_bound = next_output_timestamp_min_;
cc->Outputs()
.Get(calculator_->output_data_id_)
.SetNextTimestampBound(timestamp_bound);
}
return absl::OkStatus();
}
void LegacyJitterWithReflectionStrategy::
InitializeNextOutputTimestampWithJitter() {
next_output_timestamp_min_ = first_timestamp_;
next_output_timestamp_ =
first_timestamp_ +
random_->UnbiasedUniform64(calculator_->frame_time_usec_);
}
void LegacyJitterWithReflectionStrategy::UpdateNextOutputTimestampWithJitter() {
packet_reservoir_->Clear();
next_output_timestamp_min_ += calculator_->frame_time_usec_;
Timestamp next_output_timestamp_max_ =
next_output_timestamp_min_ + calculator_->frame_time_usec_;
next_output_timestamp_ +=
calculator_->frame_time_usec_ +
random_->UnbiasedUniform64(2 * calculator_->jitter_usec_ + 1) -
calculator_->jitter_usec_;
next_output_timestamp_ = Timestamp(ReflectBetween(
next_output_timestamp_.Value(), next_output_timestamp_min_.Value(),
next_output_timestamp_max_.Value()));
ABSL_CHECK_GE(next_output_timestamp_, next_output_timestamp_min_);
ABSL_CHECK_LT(next_output_timestamp_, next_output_timestamp_max_);
}
absl::Status ReproducibleJitterWithReflectionStrategy::Open(
CalculatorContext* cc) {
const auto resampler_options =
tool::RetrieveOptions(cc->Options<PacketResamplerCalculatorOptions>(),
cc->InputSidePackets(), "OPTIONS");
if (resampler_options.output_header() !=
PacketResamplerCalculatorOptions::NONE) {
ABSL_LOG(WARNING)
<< "VideoHeader::frame_rate holds the target value and not "
"the actual value.";
}
if (calculator_->flush_last_packet_) {
ABSL_LOG(WARNING)
<< "PacketResamplerCalculatorOptions.flush_last_packet is "
"ignored, because we are adding jitter.";
}
const auto& seed = cc->InputSidePackets().Tag(kSeedTag).Get<std::string>();
random_ = CreateSecureRandom(seed);
if (random_ == nullptr) {
return absl::InvalidArgumentError(
"SecureRandom is not available. With \"jitter\" specified, "
"PacketResamplerCalculator processing cannot proceed.");
}
return absl::OkStatus();
}
absl::Status ReproducibleJitterWithReflectionStrategy::Close(
CalculatorContext* cc) {
// If last packet is non-empty and a packet hasn't been emitted for this
// period, emit the last packet.
if (!calculator_->last_packet_.IsEmpty() && !packet_emitted_this_period_) {
calculator_->OutputWithinLimits(
cc, calculator_->last_packet_.At(next_output_timestamp_));
}
return absl::OkStatus();
}
absl::Status ReproducibleJitterWithReflectionStrategy::Process(
CalculatorContext* cc) {
RET_CHECK_GT(cc->InputTimestamp(), Timestamp::PreStream());
Packet current_packet = cc->Inputs().Get(calculator_->input_data_id_).Value();
if (calculator_->last_packet_.IsEmpty()) {
// last_packet is empty, this is the first packet of the stream.
InitializeNextOutputTimestamp(current_packet.Timestamp());
// If next_output_timestamp_ happens to fall before current_packet, emit
// current packet. Only a single packet can be emitted at the beginning
// of the stream.
if (next_output_timestamp_ < current_packet.Timestamp()) {
calculator_->OutputWithinLimits(
cc, current_packet.At(next_output_timestamp_));
packet_emitted_this_period_ = true;
}
return absl::OkStatus();
}
// Last packet is set, so we are mid-stream.
if (calculator_->frame_time_usec_ <
(current_packet.Timestamp() - calculator_->last_packet_.Timestamp())
.Value()) {
// Note, if the stream is upsampling, this could lead to the same packet
// being emitted twice. Upsampling and jitter doesn't make much sense
// but does technically work.
ABSL_LOG_FIRST_N(WARNING, 2)
<< "Adding jitter is not very useful when upsampling.";
}
// Since we may be upsampling, we need to iteratively advance the
// next_output_timestamp_ one period at a time until it reaches the period
// current_packet is in. During this process, last_packet and/or
// current_packet may be repeatly emitted.
UpdateNextOutputTimestamp(current_packet.Timestamp());
while (!packet_emitted_this_period_ &&
next_output_timestamp_ <= current_packet.Timestamp()) {
// last_packet < next_output_timestamp_ <= current_packet,
// so emit the closest packet.
Packet packet_to_emit =
current_packet.Timestamp() - next_output_timestamp_ <
next_output_timestamp_ - calculator_->last_packet_.Timestamp()
? current_packet
: calculator_->last_packet_;
calculator_->OutputWithinLimits(cc,
packet_to_emit.At(next_output_timestamp_));
packet_emitted_this_period_ = true;
// If we are upsampling, packet_emitted_this_period_ can be reset by
// the following UpdateNext and the loop will iterate.
UpdateNextOutputTimestamp(current_packet.Timestamp());
}
// Set the bounds on the output stream. Note, if we emitted a packet
// above, it will already be set at next_output_timestamp_ + 1, in which
// case we have to skip setting it.
if (cc->Outputs().Get(calculator_->output_data_id_).NextTimestampBound() <
next_output_timestamp_) {
cc->Outputs()
.Get(calculator_->output_data_id_)
.SetNextTimestampBound(next_output_timestamp_);
}
return absl::OkStatus();
}
void ReproducibleJitterWithReflectionStrategy::InitializeNextOutputTimestamp(
Timestamp current_timestamp) {
if (next_output_timestamp_min_ != Timestamp::Unset()) {
return;
}
next_output_timestamp_min_ = Timestamp(0);
next_output_timestamp_ =
Timestamp(GetNextRandom(calculator_->frame_time_usec_));
// While the current timestamp is ahead of the max (i.e. min + frame_time),
// fast-forward.
while (current_timestamp >=
next_output_timestamp_min_ + calculator_->frame_time_usec_) {
packet_emitted_this_period_ = true; // Force update...
UpdateNextOutputTimestamp(current_timestamp);
}
}
void ReproducibleJitterWithReflectionStrategy::UpdateNextOutputTimestamp(
Timestamp current_timestamp) {
if (packet_emitted_this_period_ &&
current_timestamp >=
next_output_timestamp_min_ + calculator_->frame_time_usec_) {
next_output_timestamp_min_ += calculator_->frame_time_usec_;
Timestamp next_output_timestamp_max_ =
next_output_timestamp_min_ + calculator_->frame_time_usec_;
next_output_timestamp_ += calculator_->frame_time_usec_ +
GetNextRandom(2 * calculator_->jitter_usec_ + 1) -
calculator_->jitter_usec_;
next_output_timestamp_ = Timestamp(ReflectBetween(
next_output_timestamp_.Value(), next_output_timestamp_min_.Value(),
next_output_timestamp_max_.Value()));
packet_emitted_this_period_ = false;
}
}
absl::Status JitterWithoutReflectionStrategy::Open(CalculatorContext* cc) {
const auto resampler_options =
tool::RetrieveOptions(cc->Options<PacketResamplerCalculatorOptions>(),
cc->InputSidePackets(), "OPTIONS");
if (resampler_options.output_header() !=
PacketResamplerCalculatorOptions::NONE) {
ABSL_LOG(WARNING)
<< "VideoHeader::frame_rate holds the target value and not "
"the actual value.";
}
if (calculator_->flush_last_packet_) {
ABSL_LOG(WARNING)
<< "PacketResamplerCalculatorOptions.flush_last_packet is "
"ignored, because we are adding jitter.";
}
const auto& seed = cc->InputSidePackets().Tag(kSeedTag).Get<std::string>();
random_ = CreateSecureRandom(seed);
if (random_ == nullptr) {
return absl::InvalidArgumentError(
"SecureRandom is not available. With \"jitter\" specified, "
"PacketResamplerCalculator processing cannot proceed.");
}
packet_reservoir_random_ = CreateSecureRandom(seed);
packet_reservoir_ =
absl::make_unique<PacketReservoir>(packet_reservoir_random_.get());
return absl::OkStatus();
}
absl::Status JitterWithoutReflectionStrategy::Close(CalculatorContext* cc) {
if (!packet_reservoir_->IsEmpty()) {
calculator_->OutputWithinLimits(cc, packet_reservoir_->GetSample());
}
return absl::OkStatus();
}
absl::Status JitterWithoutReflectionStrategy::Process(CalculatorContext* cc) {
RET_CHECK_GT(cc->InputTimestamp(), Timestamp::PreStream());
// Packet reservior is used to make sure there's an output for every period,
// e.g. partial period at the end of the stream.
if (packet_reservoir_->IsEnabled() &&
(calculator_->first_timestamp_ == Timestamp::Unset() ||
(cc->InputTimestamp() - next_output_timestamp_min_).Value() >= 0)) {
auto curr_packet = cc->Inputs().Get(calculator_->input_data_id_).Value();
packet_reservoir_->AddSample(curr_packet);
}
if (calculator_->first_timestamp_ == Timestamp::Unset()) {
calculator_->first_timestamp_ = cc->InputTimestamp();
InitializeNextOutputTimestamp();
if (calculator_->first_timestamp_ == next_output_timestamp_) {
calculator_->OutputWithinLimits(cc, cc->Inputs()
.Get(calculator_->input_data_id_)
.Value()
.At(next_output_timestamp_));
UpdateNextOutputTimestamp();
}
return absl::OkStatus();
}
if (calculator_->frame_time_usec_ <
(cc->InputTimestamp() - calculator_->last_packet_.Timestamp()).Value()) {
ABSL_LOG_FIRST_N(WARNING, 2)
<< "Adding jitter is not very useful when upsampling.";
}
while (true) {
const int64_t last_diff =
(next_output_timestamp_ - calculator_->last_packet_.Timestamp())
.Value();
RET_CHECK_GT(last_diff, 0);
const int64_t curr_diff =
(next_output_timestamp_ - cc->InputTimestamp()).Value();
if (curr_diff > 0) {
break;
}
calculator_->OutputWithinLimits(
cc, (std::abs(curr_diff) > last_diff
? calculator_->last_packet_
: cc->Inputs().Get(calculator_->input_data_id_).Value())
.At(next_output_timestamp_));
UpdateNextOutputTimestamp();
cc->Outputs()
.Get(calculator_->output_data_id_)
.SetNextTimestampBound(next_output_timestamp_);
}
return absl::OkStatus();
}
void JitterWithoutReflectionStrategy::InitializeNextOutputTimestamp() {
next_output_timestamp_min_ = calculator_->first_timestamp_;
next_output_timestamp_ = calculator_->first_timestamp_ +
calculator_->frame_time_usec_ * random_->RandFloat();
}
void JitterWithoutReflectionStrategy::UpdateNextOutputTimestamp() {
packet_reservoir_->Clear();
packet_reservoir_->Disable();
next_output_timestamp_ += calculator_->frame_time_usec_ *
((1.0 - calculator_->jitter_) +
2.0 * calculator_->jitter_ * random_->RandFloat());
}
absl::Status NoJitterStrategy::Open(CalculatorContext* cc) {
const auto resampler_options =
tool::RetrieveOptions(cc->Options<PacketResamplerCalculatorOptions>(),
cc->InputSidePackets(), "OPTIONS");
base_timestamp_ = resampler_options.has_base_timestamp()
? Timestamp(resampler_options.base_timestamp())
: Timestamp::Unset();
period_count_ = 0;
return absl::OkStatus();
}
absl::Status NoJitterStrategy::Close(CalculatorContext* cc) {
// Emit the last packet received if we have at least one packet, but
// haven't sent anything for its period.
if (calculator_->first_timestamp_ != Timestamp::Unset() &&
calculator_->flush_last_packet_ &&
calculator_->TimestampToPeriodIndex(
calculator_->last_packet_.Timestamp()) == period_count_) {
calculator_->OutputWithinLimits(
cc, calculator_->last_packet_.At(
calculator_->PeriodIndexToTimestamp(period_count_)));
}
return absl::OkStatus();
}
absl::Status NoJitterStrategy::Process(CalculatorContext* cc) {
RET_CHECK_GT(cc->InputTimestamp(), Timestamp::PreStream());
if (calculator_->first_timestamp_ == Timestamp::Unset()) {
// This is the first packet, initialize the first_timestamp_.
if (base_timestamp_ == Timestamp::Unset()) {
// Initialize first_timestamp_ with exactly the first packet timestamp.
calculator_->first_timestamp_ = cc->InputTimestamp();
} else {
// Initialize first_timestamp_ with the first packet timestamp
// aligned to the base_timestamp_.
int64_t first_index = MathUtil::SafeRound<int64_t, double>(
(cc->InputTimestamp() - base_timestamp_).Seconds() *
calculator_->frame_rate_);
calculator_->first_timestamp_ =
base_timestamp_ +
TimestampDiffFromSeconds(first_index / calculator_->frame_rate_);
}
if (cc->Outputs().UsesTags() && cc->Outputs().HasTag(kVideoHeaderTag)) {
cc->Outputs()
.Tag(kVideoHeaderTag)
.Add(new VideoHeader(calculator_->video_header_),
Timestamp::PreStream());
}
}
const Timestamp received_timestamp = cc->InputTimestamp();
const int64_t received_timestamp_idx =
calculator_->TimestampToPeriodIndex(received_timestamp);
// Only consider the received packet if it belongs to the current period
// (== period_count_) or to a newer one (> period_count_).
if (received_timestamp_idx >= period_count_) {
// Fill the empty periods until we are in the same index as the received
// packet.
while (received_timestamp_idx > period_count_) {
calculator_->OutputWithinLimits(
cc, calculator_->last_packet_.At(
calculator_->PeriodIndexToTimestamp(period_count_)));
++period_count_;
}
// Now, if the received packet has a timestamp larger than the middle of
// the current period, we can send a packet without waiting. We send the
// one closer to the middle.
Timestamp target_timestamp =
calculator_->PeriodIndexToTimestamp(period_count_);
if (received_timestamp >= target_timestamp) {
bool have_last_packet =
(calculator_->last_packet_.Timestamp() != Timestamp::Unset());
bool send_current =
!have_last_packet ||
(received_timestamp - target_timestamp <=
target_timestamp - calculator_->last_packet_.Timestamp());
if (send_current) {
calculator_->OutputWithinLimits(cc,
cc->Inputs()
.Get(calculator_->input_data_id_)
.Value()
.At(target_timestamp));
} else {
calculator_->OutputWithinLimits(
cc, calculator_->last_packet_.At(target_timestamp));
}
++period_count_;
}
// TODO: Add a mechanism to the framework to allow these packets
// to be output earlier (without waiting for a much later packet to
// arrive)
// Update the bound for the next packet.
cc->Outputs()
.Get(calculator_->output_data_id_)
.SetNextTimestampBound(
calculator_->PeriodIndexToTimestamp(period_count_));
}
return absl::OkStatus();
}
} // namespace mediapipe