chromium/third_party/mediapipe/src/mediapipe/framework/stream_handler/mux_input_stream_handler.cc

// Copyright 2019 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "mediapipe/framework/stream_handler/mux_input_stream_handler.h"

#include <utility>

#include "absl/log/absl_check.h"
#include "absl/strings/substitute.h"
#include "absl/synchronization/mutex.h"
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/collection_item_id.h"
#include "mediapipe/framework/input_stream_handler.h"

namespace mediapipe {

CollectionItemId MuxInputStreamHandler::GetControlStreamId() const {
  return input_stream_managers_.EndId() - 1;
}
void MuxInputStreamHandler::RemoveOutdatedDataPackets(Timestamp timestamp) {
  const CollectionItemId control_stream_id = GetControlStreamId();
  for (CollectionItemId id = input_stream_managers_.BeginId();
       id < control_stream_id; ++id) {
    input_stream_managers_.Get(id)->ErasePacketsEarlierThan(timestamp);
  }
}

// In MuxInputStreamHandler, a node is "ready" if:
// - the control stream is done (need to call Close() in this case), or
// - we have received the packets on the control stream and the selected data
//   stream at the next timestamp.
NodeReadiness MuxInputStreamHandler::GetNodeReadiness(
    Timestamp* min_stream_timestamp) {
  ABSL_DCHECK(min_stream_timestamp);
  absl::MutexLock lock(&input_streams_mutex_);

  const auto& control_stream = input_stream_managers_.Get(GetControlStreamId());
  bool empty;
  *min_stream_timestamp = control_stream->MinTimestampOrBound(&empty);

  // Data streams may contain some outdated packets which failed to be popped
  // out during "FillInputSet". (This handler doesn't sync input streams,
  // hence "FillInputSet" can be triggered before every input stream is
  // filled with packets corresponding to the same timestamp.)
  RemoveOutdatedDataPackets(*min_stream_timestamp);
  if (empty) {
    if (*min_stream_timestamp == Timestamp::Done()) {
      // Calculator is done if the control input stream is done.
      return NodeReadiness::kReadyForClose;
    }
    // Calculator is not ready to run if the control input stream is empty.
    return NodeReadiness::kNotReady;
  }

  Packet control_packet = control_stream->QueueHead();
  ABSL_CHECK(!control_packet.IsEmpty());
  int control_value = control_packet.Get<int>();
  ABSL_CHECK_LE(0, control_value);
  ABSL_CHECK_LT(control_value, input_stream_managers_.NumEntries() - 1);
  const auto& data_stream = input_stream_managers_.Get(
      input_stream_managers_.BeginId() + control_value);

  Timestamp stream_timestamp = data_stream->MinTimestampOrBound(&empty);
  if (empty) {
    if (stream_timestamp <= *min_stream_timestamp) {
      // "data_stream" didn't receive a packet corresponding to the current
      // "control_stream" packet yet.
      return NodeReadiness::kNotReady;
    }
    // "data_stream" timestamp bound update detected.
    return NodeReadiness::kReadyForProcess;
  }
  if (stream_timestamp > *min_stream_timestamp) {
    // The earliest packet "data_stream" holds corresponds to a control packet
    // yet to arrive, which means there won't be a "data_stream" packet
    // corresponding to the current "control_stream" packet, which should be
    // indicated as timestamp boun update.
    return NodeReadiness::kReadyForProcess;
  }
  ABSL_CHECK_EQ(stream_timestamp, *min_stream_timestamp);
  return NodeReadiness::kReadyForProcess;
}

// Only invoked when associated GetNodeReadiness() returned kReadyForProcess.
void MuxInputStreamHandler::FillInputSet(Timestamp input_timestamp,
                                         InputStreamShardSet* input_set) {
  ABSL_CHECK(input_timestamp.IsAllowedInStream());
  ABSL_CHECK(input_set);
  absl::MutexLock lock(&input_streams_mutex_);

  const CollectionItemId control_stream_id = GetControlStreamId();
  auto& control_stream = input_stream_managers_.Get(control_stream_id);
  int num_packets_dropped = 0;
  bool stream_is_done = false;
  Packet control_packet = control_stream->PopPacketAtTimestamp(
      input_timestamp, &num_packets_dropped, &stream_is_done);
  ABSL_CHECK_EQ(num_packets_dropped, 0)
      << absl::Substitute("Dropped $0 packet(s) on input stream \"$1\".",
                          num_packets_dropped, control_stream->Name());
  ABSL_CHECK(!control_packet.IsEmpty());
  int control_value = control_packet.Get<int>();
  AddPacketToShard(&input_set->Get(control_stream_id),
                   std::move(control_packet), stream_is_done);

  const CollectionItemId data_stream_id =
      input_stream_managers_.BeginId() + control_value;
  ABSL_CHECK_LE(input_stream_managers_.BeginId(), data_stream_id);
  ABSL_CHECK_LT(data_stream_id, control_stream_id);
  auto& data_stream = input_stream_managers_.Get(data_stream_id);
  stream_is_done = false;
  Packet data_packet = data_stream->PopPacketAtTimestamp(
      input_timestamp, &num_packets_dropped, &stream_is_done);
  ABSL_CHECK_EQ(num_packets_dropped, 0)
      << absl::Substitute("Dropped $0 packet(s) on input stream \"$1\".",
                          num_packets_dropped, data_stream->Name());
  AddPacketToShard(&input_set->Get(data_stream_id), std::move(data_packet),
                   stream_is_done);

  // Discard old packets on data streams.
  RemoveOutdatedDataPackets(input_timestamp.NextAllowedInStream());
}

REGISTER_INPUT_STREAM_HANDLER(MuxInputStreamHandler);

}  // namespace mediapipe