chromium/chromecast/cast_core/runtime/browser/message_port_service_grpc.cc

// Copyright 2021 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "chromecast/cast_core/runtime/browser/message_port_service_grpc.h"

#include <sstream>
#include <string_view>

#include "base/logging.h"
#include "base/task/bind_post_task.h"
#include "base/task/sequenced_task_runner.h"
#include "chromecast/cast_core/runtime/browser/message_port_handler.h"

namespace chromecast {

MessagePortServiceGrpc::MessagePortServiceGrpc(
    cast::v2::CoreMessagePortApplicationServiceStub* core_app_stub)
    : core_app_stub_(core_app_stub),
      task_runner_(base::SequencedTaskRunner::GetCurrentDefault()) {
  DCHECK(core_app_stub_);
}

MessagePortServiceGrpc::~MessagePortServiceGrpc() = default;

cast_receiver::Status MessagePortServiceGrpc::HandleMessage(
    cast::web::Message message) {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  cast::web::MessagePortStatus response;
  const uint32_t channel_id = message.channel().channel_id();
  auto entry = ports_.find(channel_id);
  if (entry == ports_.end()) {
    std::stringstream error_ss;
    error_ss << "Got message for unknown channel: " << channel_id;
    return cast_receiver::Status(cast_receiver::StatusCode::kUnknown,
                                 error_ss.str());
  }

  return entry->second->HandleMessage(message);
}

void MessagePortServiceGrpc::ConnectToPortAsync(
    std::string_view port_name,
    std::unique_ptr<cast_api_bindings::MessagePort> port) {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  DLOG(INFO) << "Connecting to port '" << port_name << "' as channel "
             << next_outgoing_channel_id_;

  const uint32_t channel_id = next_outgoing_channel_id_++;
  auto result = ports_.emplace(
      channel_id, MakeMessagePortHandler(channel_id, std::move(port)));
  DCHECK(result.second);

  auto call = core_app_stub_->CreateCall<
      cast::v2::CoreMessagePortApplicationServiceStub::Connect>();
  call.request().set_port_name(std::string(port_name));
  cast::web::MessagePortDescriptor* port_descriptor =
      call.request().mutable_port();
  port_descriptor->mutable_channel()->set_channel_id(channel_id);
  port_descriptor->mutable_peer_status()->set_status(
      cast::web::MessagePortStatus_Status_STARTED);
  port_descriptor->set_sequence_number(0);

  std::move(call).InvokeAsync(base::BindPostTask(
      task_runner_,
      base::BindOnce(&MessagePortServiceGrpc::OnPortConnectionEstablished,
                     weak_factory_.GetWeakPtr(), channel_id)));
}

uint32_t MessagePortServiceGrpc::RegisterOutgoingPort(
    std::unique_ptr<cast_api_bindings::MessagePort> port) {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  uint32_t channel_id = next_outgoing_channel_id_++;
  ports_.emplace(channel_id,
                 MakeMessagePortHandler(channel_id, std::move(port)));
  return channel_id;
}

void MessagePortServiceGrpc::RegisterIncomingPort(
    uint32_t channel_id,
    std::unique_ptr<cast_api_bindings::MessagePort> port) {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  auto result = ports_.emplace(
      channel_id, MakeMessagePortHandler(channel_id, std::move(port)));
  DCHECK(result.second);
}

void MessagePortServiceGrpc::Remove(uint32_t channel_id) {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  ports_.erase(channel_id);
}

std::unique_ptr<MessagePortHandler>
MessagePortServiceGrpc::MakeMessagePortHandler(
    uint32_t channel_id,
    std::unique_ptr<cast_api_bindings::MessagePort> port) {
  auto port_handler = std::make_unique<MessagePortHandler>(
      std::move(port), channel_id, this, core_app_stub_, task_runner_);
  return port_handler;
}

void MessagePortServiceGrpc::OnPortConnectionEstablished(
    uint32_t channel_id,
    cast::utils::GrpcStatusOr<cast::bindings::ConnectResponse> response_or) {
  if (!response_or.ok()) {
    LOG(ERROR) << "Message port connect failed: channel_id=" << channel_id;
    Remove(channel_id);
  }
}

}  // namespace chromecast