chromium/chrome/browser/nearby_sharing/incoming_frames_reader.cc

// Copyright 2020 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "chrome/browser/nearby_sharing/incoming_frames_reader.h"

#include <type_traits>

#include "base/task/sequenced_task_runner.h"
#include "chromeos/ash/components/nearby/common/connections_manager/nearby_connection.h"
#include "chromeos/ash/services/nearby/public/mojom/nearby_decoder.mojom.h"
#include "components/cross_device/logging/logging.h"

namespace {

std::ostream& operator<<(std::ostream& out,
                         const sharing::mojom::V1Frame::Tag& obj) {
  out << static_cast<std::underlying_type<sharing::mojom::V1Frame::Tag>::type>(
      obj);
  return out;
}

}  // namespace

IncomingFramesReader::IncomingFramesReader(
    ash::nearby::NearbyProcessManager* process_manager,
    NearbyConnection* connection)
    : process_manager_(process_manager), connection_(connection) {
  DCHECK(process_manager);
  DCHECK(connection);
}

IncomingFramesReader::~IncomingFramesReader() = default;

void IncomingFramesReader::ReadFrame(
    base::OnceCallback<void(std::optional<sharing::mojom::V1FramePtr>)>
        callback) {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  DCHECK(!callback_);
  DCHECK(!is_process_stopped_);

  callback_ = std::move(callback);
  frame_type_ = std::nullopt;

  // Check in cache for frame.
  std::optional<sharing::mojom::V1FramePtr> cached_frame =
      GetCachedFrame(frame_type_);
  if (cached_frame) {
    Done(std::move(cached_frame));
    return;
  }

  ReadNextFrame();
}

void IncomingFramesReader::ReadFrame(
    sharing::mojom::V1Frame::Tag frame_type,
    base::OnceCallback<void(std::optional<sharing::mojom::V1FramePtr>)>
        callback,
    base::TimeDelta timeout) {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  DCHECK(!callback_);
  DCHECK(!is_process_stopped_);
  if (!connection_) {
    std::move(callback).Run(std::nullopt);
    return;
  }

  callback_ = std::move(callback);
  frame_type_ = frame_type;

  timeout_callback_.Reset(base::BindOnce(&IncomingFramesReader::OnTimeout,
                                         weak_ptr_factory_.GetWeakPtr()));
  base::SequencedTaskRunner::GetCurrentDefault()->PostDelayedTask(
      FROM_HERE, base::BindOnce(timeout_callback_.callback()), timeout);

  // Check in cache for frame.
  std::optional<sharing::mojom::V1FramePtr> cached_frame =
      GetCachedFrame(frame_type_);
  if (cached_frame) {
    Done(std::move(cached_frame));
    return;
  }

  ReadNextFrame();
}

void IncomingFramesReader::OnNearbyProcessStopped(
    ash::nearby::NearbyProcessManager::NearbyProcessShutdownReason) {
  is_process_stopped_ = true;
  Done(std::nullopt);
}

void IncomingFramesReader::ReadNextFrame() {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  connection_->Read(
      base::BindOnce(&IncomingFramesReader::OnDataReadFromConnection,
                     weak_ptr_factory_.GetWeakPtr()));
}

void IncomingFramesReader::OnTimeout() {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);

  CD_LOG(WARNING, Feature::NS)
      << __func__ << ": Timed out reading from NearbyConnection.";
  Done(std::nullopt);
}

void IncomingFramesReader::OnDataReadFromConnection(
    std::optional<std::vector<uint8_t>> bytes) {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);

  if (!callback_) {
    return;
  }

  if (!bytes) {
    CD_LOG(WARNING, Feature::NS) << __func__ << ": Failed to read frame";
    Done(std::nullopt);
    return;
  }

  sharing::mojom::NearbySharingDecoder* decoder =
      GetOrStartNearbySharingDecoder();

  if (!decoder) {
    CD_LOG(WARNING, Feature::NS)
        << __func__
        << ": Cannot decode frame. Not currently bound to nearby process";
    Done(std::nullopt);
    return;
  }

  decoder->DecodeFrame(*bytes,
                       base::BindOnce(&IncomingFramesReader::OnFrameDecoded,
                                      weak_ptr_factory_.GetWeakPtr()));
}

void IncomingFramesReader::OnFrameDecoded(sharing::mojom::FramePtr frame) {
  if (!frame) {
    ReadNextFrame();
    return;
  }

  if (!frame->is_v1()) {
    CD_LOG(VERBOSE, Feature::NS)
        << __func__ << ": Frame read does not have V1Frame";
    ReadNextFrame();
    return;
  }

  sharing::mojom::V1FramePtr v1_frame(std::move(frame->get_v1()));
  sharing::mojom::V1Frame::Tag v1_frame_type = v1_frame->which();
  if (frame_type_ && *frame_type_ != v1_frame_type) {
    CD_LOG(WARNING, Feature::NS)
        << __func__ << ": Failed to read frame of type " << *frame_type_
        << ", but got frame of type " << v1_frame_type << ". Cached for later.";
    cached_frames_.insert({v1_frame_type, std::move(v1_frame)});
    ReadNextFrame();
    return;
  }

  CD_LOG(VERBOSE, Feature::NS)
      << __func__ << ": Successfully read frame of type " << v1_frame_type;
  Done(std::move(v1_frame));
}

void IncomingFramesReader::Done(
    std::optional<sharing::mojom::V1FramePtr> frame) {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);

  frame_type_ = std::nullopt;
  timeout_callback_.Cancel();
  if (callback_) {
    std::move(callback_).Run(std::move(frame));
  }
}

std::optional<sharing::mojom::V1FramePtr> IncomingFramesReader::GetCachedFrame(
    std::optional<sharing::mojom::V1Frame::Tag> frame_type) {
  CD_LOG(VERBOSE, Feature::NS) << __func__ << ": Fetching cached frame";
  if (frame_type)
    CD_LOG(VERBOSE, Feature::NS)
        << __func__ << ": Requested frame type - " << *frame_type;

  auto iter =
      frame_type ? cached_frames_.find(*frame_type) : cached_frames_.begin();

  if (iter == cached_frames_.end())
    return std::nullopt;

  CD_LOG(VERBOSE, Feature::NS)
      << __func__ << ": Successfully read cached frame";
  sharing::mojom::V1FramePtr frame = std::move(iter->second);
  cached_frames_.erase(iter);
  return frame;
}

sharing::mojom::NearbySharingDecoder*
IncomingFramesReader::GetOrStartNearbySharingDecoder() {
  if (!process_reference_) {
    process_reference_ = process_manager_->GetNearbyProcessReference(
        base::BindOnce(&IncomingFramesReader::OnNearbyProcessStopped,
                       weak_ptr_factory_.GetWeakPtr()));

    if (!process_reference_) {
      CD_LOG(WARNING, Feature::NS)
          << __func__ << "Failed to get a reference to the nearby process.";
      is_process_stopped_ = true;
      return nullptr;
    }
  }

  is_process_stopped_ = false;

  sharing::mojom::NearbySharingDecoder* decoder =
      process_reference_->GetNearbySharingDecoder().get();

  if (!decoder)
    CD_LOG(WARNING, Feature::NS)
        << __func__ << "Failed to get decoder from process reference.";

  return decoder;
}