// Copyright 2020 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/nearby_sharing/instantmessaging/receive_messages_express.h"
#include <sstream>
#include <string_view>
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/metrics/histogram_functions.h"
#include "base/strings/stringprintf.h"
#include "chrome/browser/nearby_sharing/instantmessaging/constants.h"
#include "chrome/browser/nearby_sharing/instantmessaging/proto/instantmessaging.pb.h"
#include "chrome/browser/nearby_sharing/instantmessaging/token_fetcher.h"
#include "chrome/browser/nearby_sharing/webrtc_request_builder.h"
#include "chromeos/ash/components/nearby/common/client/nearby_http_result.h"
#include "components/cross_device/logging/logging.h"
#include "mojo/public/cpp/bindings/self_owned_receiver.h"
#include "net/base/load_flags.h"
#include "services/network/public/cpp/resource_request.h"
#include "services/network/public/cpp/shared_url_loader_factory.h"
#include "services/network/public/cpp/simple_url_loader.h"
#include "url/gurl.h"
namespace {
const base::TimeDelta kFastPathReadyTimeout = base::Milliseconds(2500);
// Timeout for the receive messages stream, from when the stream first opens.
// This timeout applies to the Tachyon signaling process, so once we establish
// the peer-to-peer connection this stream and timeout will be canceled. There
// are other timeouts in the WebRTC medium that will cancel the signaling
// process sooner than 60s, so this is just a failsafe to make sure we clean up
// the ReceiveMessagesExpress if something goes wrong.
const base::TimeDelta kStreamTimeout = base::Seconds(60);
const net::NetworkTrafficAnnotationTag kTrafficAnnotation =
net::DefineNetworkTrafficAnnotation("receive_messages_express", R"(
semantics {
sender: "ReceiveMessagesExpress"
description:
"Receives messages sent from another device via a Gaia "
"authenticated Google messaging backend."
trigger:
"Peer uses any Chrome cross-device sharing feature and selects "
"this devices to send the data to."
data: "WebRTC session description protocol messages are exchanged "
"between devices to set up a peer to peer connection as documented "
"in https://tools.ietf.org/html/rfc4566 and "
"https://www.w3.org/TR/webrtc/#session-description-model. No user "
"data is sent in the request."
destination: GOOGLE_OWNED_SERVICE
}
policy {
cookies_allowed: NO
setting:
"This feature is only enabled for signed-in users who enable "
"Nearby sharing or Phone Hub."
chrome_policy {
NearbyShareAllowed {
policy_options {mode: MANDATORY}
NearbyShareAllowed: 0
},
PhoneHubAllowed {
policy_options {mode: MANDATORY}
PhoneHubAllowed: 0
}
}
})");
std::optional<ash::nearby::NearbyHttpStatus> HttpStatusFromUrlLoader(
const network::SimpleURLLoader* loader) {
if (!loader)
return std::nullopt;
return ash::nearby::NearbyHttpStatus(loader->NetError(),
loader->ResponseInfo());
}
void LogReceiveResult(
bool success,
const std::optional<ash::nearby::NearbyHttpStatus>& http_status,
const std::string& request_id) {
std::stringstream ss;
ss << "Instant messaging receive express "
<< (success ? "succeeded" : "failed") << " for request " << request_id;
base::UmaHistogramBoolean(
"Nearby.Connections.InstantMessaging.ReceiveExpress.Result", success);
if (http_status) {
ss << " HTTP status: " << *http_status;
if (!success) {
base::UmaHistogramSparse(
"Nearby.Connections.InstantMessaging.ReceiveExpress.Result."
"FailureReason",
http_status->GetResultCodeForMetrics());
}
}
if (success) {
CD_LOG(INFO, Feature::NS) << ss.str();
} else {
CD_LOG(ERROR, Feature::NS) << ss.str();
}
}
} // namespace
// static
void ReceiveMessagesExpress::StartReceiveSession(
const std::string& self_id,
sharing::mojom::LocationHintPtr location_hint,
mojo::PendingRemote<sharing::mojom::IncomingMessagesListener>
incoming_messages_listener,
StartReceivingMessagesCallback callback,
signin::IdentityManager* identity_manager,
scoped_refptr<network::SharedURLLoaderFactory> url_loader_factory) {
chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesExpressRequest
request = BuildReceiveRequest(self_id, std::move(location_hint));
CD_LOG(INFO, Feature::NS) << __func__ << ": self_id=" << self_id
<< ", request id=" << request.header().request_id();
auto receive_messages_express = base::WrapUnique(
new ReceiveMessagesExpress(std::move(incoming_messages_listener),
identity_manager, url_loader_factory));
// Created a mojo pipe for the session that can be used to stop receiving.
mojo::PendingRemote<sharing::mojom::ReceiveMessagesSession> pending_remote;
mojo::PendingReceiver<sharing::mojom::ReceiveMessagesSession>
pending_receiver = pending_remote.InitWithNewPipeAndPassReceiver();
receive_messages_express->StartReceivingMessages(request, std::move(callback),
std::move(pending_remote));
mojo::MakeSelfOwnedReceiver(std::move(receive_messages_express),
std::move(pending_receiver));
}
ReceiveMessagesExpress::ReceiveMessagesExpress(
mojo::PendingRemote<sharing::mojom::IncomingMessagesListener>
incoming_messages_listener,
signin::IdentityManager* identity_manager,
scoped_refptr<network::SharedURLLoaderFactory> url_loader_factory)
: incoming_messages_listener_(std::move(incoming_messages_listener)),
token_fetcher_(identity_manager),
url_loader_factory_(std::move(url_loader_factory)) {}
ReceiveMessagesExpress::~ReceiveMessagesExpress() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
CD_LOG(VERBOSE, Feature::NS)
<< __func__
<< ": Receive messages session going down, request id=" << request_id_;
fast_path_ready_timeout_timer_.Stop();
if (start_receiving_messages_callback_) {
std::move(start_receiving_messages_callback_)
.Run(false, mojo::NullRemote());
}
}
void ReceiveMessagesExpress::StartReceivingMessages(
const chrome_browser_nearby_sharing_instantmessaging::
ReceiveMessagesExpressRequest& request,
StartReceivingMessagesCallback start_receiving_messages_callback,
mojo::PendingRemote<sharing::mojom::ReceiveMessagesSession>
pending_remote_for_result) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!url_loader_);
CD_LOG(VERBOSE, Feature::NS)
<< "ReceiveMessagesExpress::StartReceivingMessages() called.";
request_id_ = request.header().request_id();
// Used to complete the initial mojo call once fast path is received.
start_receiving_messages_callback_ =
std::move(start_receiving_messages_callback);
// This is the remote side of the self owned mojo pipe that will be returned
// when completing start_receiving_messages_callback
self_pending_remote_ = std::move(pending_remote_for_result);
token_fetcher_.GetAccessToken(
base::BindOnce(&ReceiveMessagesExpress::DoStartReceivingMessages,
weak_ptr_factory_.GetWeakPtr(), request));
}
void ReceiveMessagesExpress::DoStartReceivingMessages(
const chrome_browser_nearby_sharing_instantmessaging::
ReceiveMessagesExpressRequest& request,
const std::string& oauth_token) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(url_loader_ == nullptr);
base::UmaHistogramBoolean(
"Nearby.Connections.InstantMessaging.ReceiveExpress."
"OAuthTokenFetchResult",
!oauth_token.empty());
if (oauth_token.empty()) {
FailSessionAndDestruct("Auth token fetch failed");
// |this| may be destroyed here.
return;
}
CD_LOG(VERBOSE, Feature::NS)
<< __func__ << ": OAuth token fetched; starting stream download";
auto resource_request = std::make_unique<network::ResourceRequest>();
resource_request->url = GURL(kInstantMessagingReceiveMessageAPI);
resource_request->load_flags =
net::LOAD_BYPASS_CACHE | net::LOAD_DISABLE_CACHE;
resource_request->credentials_mode = network::mojom::CredentialsMode::kOmit;
resource_request->method = net::HttpRequestHeaders::kPostMethod;
resource_request->headers.AddHeaderFromString(
base::StringPrintf(kAuthorizationHeaderFormat, oauth_token.c_str()));
url_loader_ = network::SimpleURLLoader::Create(std::move(resource_request),
kTrafficAnnotation);
url_loader_->SetTimeoutDuration(kStreamTimeout);
url_loader_->AttachStringForUpload(request.SerializeAsString(),
"application/x-protobuf");
url_loader_->DownloadAsStream(url_loader_factory_.get(), this);
// We are safe to use base::Unretained() here because if
// ReceiveMessagesExpress is destroyed the timer will go out of scope first
// which will cancel it.
fast_path_ready_timeout_timer_.Start(
FROM_HERE, kFastPathReadyTimeout,
base::BindOnce(&ReceiveMessagesExpress::OnFastPathReadyTimeout,
base::Unretained(this)));
}
void ReceiveMessagesExpress::OnFastPathReadyTimeout() {
CD_LOG(WARNING, Feature::NS) << __func__;
FailSessionAndDestruct("Timeout before receiving fast path ready");
// |this| will be destroyed here.
return;
}
void ReceiveMessagesExpress::StopReceivingMessages() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
fast_path_ready_timeout_timer_.Stop();
// Cancel any pending calls into this object.
weak_ptr_factory_.InvalidateWeakPtrs();
// This implicitly cancels the download stream. We intentionally don't call
// OnComplete() when the other side calls StopReceivingMessages().
url_loader_.reset();
CD_LOG(VERBOSE, Feature::NS)
<< __func__ << ": callback already invoked? "
<< (start_receiving_messages_callback_ ? "no" : "yes");
if (start_receiving_messages_callback_) {
FailSessionAndDestruct(
"StopReceivingMessages() called before fast path ready was received");
// |this| destroyed here.
return;
}
}
void ReceiveMessagesExpress::OnDataReceived(std::string_view data,
base::OnceClosure resume) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
for (auto response : stream_parser_.Append(data)) {
DelegateMessage(response);
}
std::move(resume).Run();
}
void ReceiveMessagesExpress::DelegateMessage(
const chrome_browser_nearby_sharing_instantmessaging::
ReceiveMessagesResponse& response) {
// Security Note - The ReceiveMessagesResponse proto is coming from a trusted
// Google server (Tachyon) from the signaling channel for webrtc messages for
// sharing messages and hence can be parsed on the browser process.
// The message contained within the proto is untrusted and should be parsed
// within a sandbox process.
switch (response.body_case()) {
case chrome_browser_nearby_sharing_instantmessaging::
ReceiveMessagesResponse::kFastPathReady:
OnFastPathReady();
break;
case chrome_browser_nearby_sharing_instantmessaging::
ReceiveMessagesResponse::kInboxMessage:
OnMessageReceived(response.inbox_message().message());
break;
default:
CD_LOG(ERROR, Feature::NS)
<< __func__
<< ": message body case was unexpected: " << response.body_case();
NOTREACHED_IN_MIGRATION();
}
}
void ReceiveMessagesExpress::OnComplete(bool success) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
fast_path_ready_timeout_timer_.Stop();
std::optional<ash::nearby::NearbyHttpStatus> http_status =
HttpStatusFromUrlLoader(url_loader_.get());
CD_LOG(VERBOSE, Feature::NS)
<< __func__ << ": success? " << (success ? "yes" : "no")
<< ", start calback invoked? "
<< (start_receiving_messages_callback_ ? "no" : "yes") << ", net::Error "
<< url_loader_->NetError();
if (start_receiving_messages_callback_) {
LogReceiveResult(success, http_status, request_id_);
// If we have not called start_receiving_messages_callback_ yet, we
// consider that a failure and need to complete the mojo call with a
// failure.
FailSessionAndDestruct("Download stream ended before fast path ready");
// |this| will be destroyed here.
return;
} else {
// Only call OnComplete() if the start callback has been invoked, meaning
// the stream has opened and we have received "fast path ready".
incoming_messages_listener_->OnComplete(success);
}
}
void ReceiveMessagesExpress::OnRetry(base::OnceClosure start_retry) {
CD_LOG(ERROR, Feature::NS)
<< __func__ << ": retry is not implemented for the url_fetcher";
NOTIMPLEMENTED();
}
void ReceiveMessagesExpress::OnFastPathReady() {
CD_LOG(VERBOSE, Feature::NS) << __func__;
fast_path_ready_timeout_timer_.Stop();
if (start_receiving_messages_callback_) {
LogReceiveResult(/*success=*/true, /*http_status=*/std::nullopt,
request_id_);
std::move(start_receiving_messages_callback_)
.Run(true, std::move(self_pending_remote_));
}
}
void ReceiveMessagesExpress::OnMessageReceived(const std::string& message) {
CD_LOG(VERBOSE, Feature::NS)
<< __func__ << ": message size: " << message.size();
if (!incoming_messages_listener_) {
CD_LOG(WARNING, Feature::NS)
<< __func__ << ": no listener available to receive message";
return;
}
incoming_messages_listener_->OnMessage(message);
}
void ReceiveMessagesExpress::FailSessionAndDestruct(const std::string reason) {
// Cancel any pending calls into this object.
weak_ptr_factory_.InvalidateWeakPtrs();
// Explicitly stop any pending downloads if there are any.
url_loader_.reset();
if (start_receiving_messages_callback_) {
// We don't give the remote in the callback because at this point
// calling StopReceiveMessages won't do anything.
std::move(start_receiving_messages_callback_)
.Run(false, mojo::NullRemote());
}
CD_LOG(ERROR, Feature::NS)
<< __func__ << ": Terminating receive message express session: ["
<< reason << "]";
// If we have not returned self_pending_remote_ to the caller, This will kill
// the self-owned mojo pipe and implicitly destroy this object. If we have
// given out this pending remote through |start_receiving_messages_callback_|,
// the other side of the pipe controls the lifetime of this object and this
// reset does nothing.
self_pending_remote_.reset();
}