chromium/chrome/services/sharing/nearby/nearby_connections_stream_buffer_manager.cc

// Copyright 2020 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "chrome/services/sharing/nearby/nearby_connections_stream_buffer_manager.h"

#include <utility>

#include "base/containers/contains.h"
#include "components/cross_device/logging/logging.h"
#include "third_party/nearby/src/internal/platform/exception.h"
#include "third_party/nearby/src/internal/platform/input_stream.h"

namespace nearby {
namespace connections {

NearbyConnectionsStreamBufferManager::PayloadWithBuffer::PayloadWithBuffer(
    Payload payload)
    : payload(std::move(payload)) {}

NearbyConnectionsStreamBufferManager::NearbyConnectionsStreamBufferManager() =
    default;

NearbyConnectionsStreamBufferManager::~NearbyConnectionsStreamBufferManager() =
    default;

void NearbyConnectionsStreamBufferManager::StartTrackingPayload(
    Payload payload) {
  int64_t payload_id = payload.GetId();
  CD_LOG(VERBOSE, Feature::NEARBY_INFRA)
      << "Starting to track stream payload with ID " << payload_id;

  id_to_payload_with_buffer_map_[payload_id] =
      std::make_unique<PayloadWithBuffer>(std::move(payload));
}

bool NearbyConnectionsStreamBufferManager::IsTrackingPayload(
    int64_t payload_id) const {
  return base::Contains(id_to_payload_with_buffer_map_, payload_id);
}

void NearbyConnectionsStreamBufferManager::StopTrackingFailedPayload(
    int64_t payload_id) {
  id_to_payload_with_buffer_map_.erase(payload_id);
  CD_LOG(VERBOSE, Feature::NEARBY_INFRA)
      << "Stopped tracking payload with ID " << payload_id << " "
      << "and cleared internal memory.";
}

void NearbyConnectionsStreamBufferManager::HandleBytesTransferred(
    int64_t payload_id,
    int64_t cumulative_bytes_transferred_so_far) {
  auto it = id_to_payload_with_buffer_map_.find(payload_id);
  if (it == id_to_payload_with_buffer_map_.end()) {
    CD_LOG(ERROR, Feature::NEARBY_INFRA)
        << "Attempted to handle stream bytes for payload with ID " << payload_id
        << ", but this payload was not being tracked.";
    return;
  }

  PayloadWithBuffer* payload_with_buffer = it->second.get();

  // We only need to read the new bytes which have not already been inserted
  // into the buffer.
  size_t bytes_to_read =
      cumulative_bytes_transferred_so_far - payload_with_buffer->buffer.size();

  InputStream* stream = payload_with_buffer->payload.AsStream();
  if (!stream) {
    CD_LOG(ERROR, Feature::NEARBY_INFRA)
        << "Payload with ID " << payload_id << " is not a stream "
        << "payload; transfer has failed.";
    StopTrackingFailedPayload(payload_id);
    return;
  }

  ExceptionOr<ByteArray> bytes = stream->Read(bytes_to_read);
  if (!bytes.ok()) {
    CD_LOG(ERROR, Feature::NEARBY_INFRA)
        << "Payload with ID " << payload_id << " encountered "
        << "exception while reading; transfer has failed.";
    StopTrackingFailedPayload(payload_id);
    return;
  }

  payload_with_buffer->buffer += static_cast<std::string>(bytes.result());
}

ByteArray
NearbyConnectionsStreamBufferManager::GetCompletePayloadAndStopTracking(
    int64_t payload_id) {
  auto it = id_to_payload_with_buffer_map_.find(payload_id);
  if (it == id_to_payload_with_buffer_map_.end()) {
    CD_LOG(ERROR, Feature::NEARBY_INFRA)
        << "Attempted to get complete payload with ID " << payload_id
        << ", but this payload was not being tracked.";
    return ByteArray();
  }

  ByteArray complete_payload(it->second->buffer);

  // Close stream and erase internal state before returning payload.
  it->second->payload.AsStream()->Close();
  id_to_payload_with_buffer_map_.erase(it);

  return complete_payload;
}

}  // namespace connections
}  // namespace nearby