chromium/chromeos/ash/services/libassistant/chromium_http_connection.cc

// Copyright 2018 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

// The file comes from Google Home(cast) implementation.

#include "chromeos/ash/services/libassistant/chromium_http_connection.h"

#include <algorithm>
#include <memory>
#include <string_view>
#include <utility>

#include "base/containers/span.h"
#include "base/logging.h"
#include "base/task/thread_pool.h"
#include "mojo/public/cpp/bindings/pending_remote.h"
#include "net/base/load_flags.h"
#include "services/network/public/cpp/header_util.h"
#include "services/network/public/cpp/resource_request.h"
#include "services/network/public/cpp/shared_url_loader_factory.h"
#include "services/network/public/cpp/simple_url_loader.h"
#include "services/network/public/mojom/url_response_head.mojom.h"

using assistant_client::HttpConnection;
using network::PendingSharedURLLoaderFactory;
using network::SharedURLLoaderFactory;

// A macro which ensures we are running in |task_runner_|'s sequence.
#define ENSURE_IN_SEQUENCE(method, ...)                                  \
  if (!task_runner_->RunsTasksInCurrentSequence()) {                     \
    task_runner_->PostTask(FROM_HERE,                                    \
                           base::BindOnce(method, this, ##__VA_ARGS__)); \
    return;                                                              \
  }

namespace ash::libassistant {

namespace {

// Invalid/Unknown HTTP response code.
constexpr int kResponseCodeInvalid = -1;

}  // namespace

ChromiumHttpConnection::ChromiumHttpConnection(
    std::unique_ptr<PendingSharedURLLoaderFactory> pending_url_loader_factory,
    Delegate* delegate)
    : delegate_(delegate),
      task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})),
      pending_url_loader_factory_(std::move(pending_url_loader_factory)) {
  DCHECK(delegate_);
  DCHECK(pending_url_loader_factory_);

  // Add a reference, so |this| cannot go away until Close() is called.
  AddRef();
}

ChromiumHttpConnection::~ChromiumHttpConnection() {
  // The destructor may be called on another sequence when the connection
  // is cancelled early, for example due to a reconfigure event.
  DCHECK_EQ(state_, State::DESTROYED);
}

void ChromiumHttpConnection::SetRequest(const std::string& url, Method method) {
  ENSURE_IN_SEQUENCE(&ChromiumHttpConnection::SetRequest, url, method);
  DCHECK_EQ(state_, State::NEW);
  url_ = GURL(url);
  method_ = method;
}

void ChromiumHttpConnection::AddHeader(const std::string& name,
                                       const std::string& value) {
  ENSURE_IN_SEQUENCE(&ChromiumHttpConnection::AddHeader, name, value);
  DCHECK_EQ(state_, State::NEW);

  if (!network::IsRequestHeaderSafe(name, value)) {
    VLOG(2) << "Ignoring unsafe request header: " << name;
    return;
  }

  // From https://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2:
  // "Multiple message-header fields with the same field-name MAY be present in
  // a message if and only if the entire field-value for that header field is
  // defined as a comma-separated list [i.e., #(values)]. It MUST be possible to
  // combine the multiple header fields into one "field-name: field-value" pair,
  // without changing the semantics of the message, by appending each subsequent
  // field-value to the first, each separated by a comma."
  std::optional<std::string> existing_value = headers_.GetHeader(name);
  if (existing_value) {
    headers_.SetHeader(name, *existing_value + ',' + value);
  } else {
    headers_.SetHeader(name, value);
  }
}

void ChromiumHttpConnection::SetUploadContent(const std::string& content,
                                              const std::string& content_type) {
  ENSURE_IN_SEQUENCE(&ChromiumHttpConnection::SetUploadContent, content,
                     content_type);
  DCHECK_EQ(state_, State::NEW);
  upload_content_ = content;
  upload_content_type_ = content_type;
  chunked_upload_content_type_ = "";
}

void ChromiumHttpConnection::SetChunkedUploadContentType(
    const std::string& content_type) {
  ENSURE_IN_SEQUENCE(&ChromiumHttpConnection::SetChunkedUploadContentType,
                     content_type);
  DCHECK_EQ(state_, State::NEW);
  upload_content_ = "";
  upload_content_type_ = "";
  chunked_upload_content_type_ = content_type;
  AddHeader(::net::HttpRequestHeaders::kContentType, content_type);
}

void ChromiumHttpConnection::EnableHeaderResponse() {
  ENSURE_IN_SEQUENCE(&ChromiumHttpConnection::EnableHeaderResponse)
  enable_header_response_ = true;
}

void ChromiumHttpConnection::EnablePartialResults() {
  ENSURE_IN_SEQUENCE(&ChromiumHttpConnection::EnablePartialResults);
  DCHECK_EQ(state_, State::NEW);
  handle_partial_response_ = true;
}

void ChromiumHttpConnection::Start() {
  ENSURE_IN_SEQUENCE(&ChromiumHttpConnection::Start);
  DCHECK_EQ(state_, State::NEW);
  state_ = State::STARTED;

  if (!url_.is_valid()) {
    state_ = State::COMPLETED;
    VLOG(2) << "Completing connection with invalid URL";
    delegate_->OnNetworkError(kResponseCodeInvalid, "Invalid GURL");
    return;
  }

  auto resource_request = std::make_unique<network::ResourceRequest>();
  resource_request->url = url_;
  resource_request->headers = headers_;
  switch (method_) {
    case Method::GET:
      resource_request->method = "GET";
      break;
    case Method::POST:
      resource_request->method = "POST";
      break;
    case Method::HEAD:
      resource_request->method = "HEAD";
      break;
    case Method::PATCH:
      resource_request->method = "PATCH";
      break;
    case Method::PUT:
      resource_request->method = "PUT";
      break;
    case Method::DELETE:
      resource_request->method = "DELETE";
      break;
  }
  resource_request->credentials_mode = network::mojom::CredentialsMode::kOmit;

  const bool chunked_upload =
      !chunked_upload_content_type_.empty() && method_ == Method::POST;
  if (chunked_upload) {
    // Attach a chunked upload body.
    mojo::PendingRemote<network::mojom::ChunkedDataPipeGetter> data_remote;
    receiver_set_.Add(this, data_remote.InitWithNewPipeAndPassReceiver());
    resource_request->request_body = new network::ResourceRequestBody();
    resource_request->request_body->SetToChunkedDataPipe(
        std::move(data_remote),
        network::ResourceRequestBody::ReadOnlyOnce(false));
  }

  url_loader_ = network::SimpleURLLoader::Create(std::move(resource_request),
                                                 NO_TRAFFIC_ANNOTATION_YET);
  url_loader_->SetRetryOptions(
      /*max_retries=*/0, network::SimpleURLLoader::RETRY_NEVER);
  if (!upload_content_type_.empty())
    url_loader_->AttachStringForUpload(upload_content_, upload_content_type_);

  auto factory =
      SharedURLLoaderFactory::Create(std::move(pending_url_loader_factory_));

  if (handle_partial_response_) {
    url_loader_->SetOnResponseStartedCallback(
        base::BindOnce(&ChromiumHttpConnection::OnResponseStarted, this));
    url_loader_->DownloadAsStream(factory.get(), this);
  } else {
    url_loader_->DownloadToStringOfUnboundedSizeUntilCrashAndDie(
        factory.get(),
        base::BindOnce(&ChromiumHttpConnection::OnURLLoadComplete, this));
  }
}

void ChromiumHttpConnection::Pause() {
  ENSURE_IN_SEQUENCE(&ChromiumHttpConnection::Pause);
  is_paused_ = true;
}

void ChromiumHttpConnection::Resume() {
  ENSURE_IN_SEQUENCE(&ChromiumHttpConnection::Resume);
  is_paused_ = false;

  if (!partial_response_cache_.empty()) {
    delegate_->OnPartialResponse(partial_response_cache_);
    partial_response_cache_.clear();
  }

  if (on_resume_callback_)
    std::move(on_resume_callback_).Run();
}

void ChromiumHttpConnection::Close() {
  ENSURE_IN_SEQUENCE(&ChromiumHttpConnection::Close);
  if (state_ == State::DESTROYED)
    return;

  state_ = State::DESTROYED;
  url_loader_.reset();

  delegate_->OnConnectionDestroyed();

  Release();
}

void ChromiumHttpConnection::UploadData(const std::string& data,
                                        bool is_last_chunk) {
  ENSURE_IN_SEQUENCE(&ChromiumHttpConnection::UploadData, data, is_last_chunk);
  if (state_ != State::STARTED)
    return;

  upload_body_ += data;

  upload_body_size_ += data.size();
  if (is_last_chunk) {
    // Send size before the rest of the body. While it doesn't matter much, if
    // the other side receives the size before the last chunk, which Mojo does
    // not guarantee, some protocols can merge the data and the last chunk
    // itself into a single frame.
    has_last_chunk_ = is_last_chunk;
    if (get_size_callback_)
      std::move(get_size_callback_).Run(net::OK, upload_body_size_);
  }

  SendData();
}

void ChromiumHttpConnection::GetSize(GetSizeCallback get_size_callback) {
  if (has_last_chunk_)
    std::move(get_size_callback).Run(net::OK, upload_body_size_);
  else
    get_size_callback_ = std::move(get_size_callback);
}

void ChromiumHttpConnection::StartReading(
    mojo::ScopedDataPipeProducerHandle pipe) {
  // Delete any existing pipe, if any.
  upload_pipe_watcher_.reset();
  upload_pipe_ = std::move(pipe);
  upload_pipe_watcher_ = std::make_unique<mojo::SimpleWatcher>(
      FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL);
  upload_pipe_watcher_->Watch(
      upload_pipe_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
      base::BindRepeating(&ChromiumHttpConnection::OnUploadPipeWriteable,
                          base::Unretained(this)));

  // Will attempt to start sending the request body, if any data is available.
  SendData();
}

void ChromiumHttpConnection::OnDataReceived(std::string_view string_piece,
                                            base::OnceClosure resume) {
  DCHECK(handle_partial_response_);

  if (is_paused_) {
    // If the connection is paused, stop sending |OnPartialResponse|
    // notification to the delegate and cache the response part.
    on_resume_callback_ = std::move(resume);
    DCHECK(partial_response_cache_.empty());
    partial_response_cache_ = std::string(string_piece);
  } else {
    DCHECK(partial_response_cache_.empty());
    delegate_->OnPartialResponse(std::string(string_piece));
    std::move(resume).Run();
  }
}

void ChromiumHttpConnection::OnComplete(bool success) {
  DCHECK(handle_partial_response_);

  if (state_ != State::STARTED)
    return;

  state_ = State::COMPLETED;

  int response_code = kResponseCodeInvalid;
  std::string raw_headers;
  if (url_loader_->ResponseInfo() && url_loader_->ResponseInfo()->headers) {
    raw_headers = url_loader_->ResponseInfo()->headers->raw_headers();
    response_code = url_loader_->ResponseInfo()->headers->response_code();
  }

  if (response_code != kResponseCodeInvalid) {
    delegate_->OnCompleteResponse(response_code, raw_headers, /*response=*/"");
    return;
  }

  const std::string message = net::ErrorToString(url_loader_->NetError());
  VLOG(3) << "ChromiumHttpConnection completed with network error="
          << url_loader_->NetError() << ": " << message;
  delegate_->OnNetworkError(url_loader_->NetError(), message);
}

void ChromiumHttpConnection::OnRetry(base::OnceClosure start_retry) {
  DCHECK(handle_partial_response_);
  // Retries are not enabled for these requests.
  NOTREACHED_IN_MIGRATION();
}

// Attempts to send more of the upload body, if more data is available, and
// |upload_pipe_| is valid.
void ChromiumHttpConnection::SendData() {
  if (!upload_pipe_.is_valid() || upload_body_.empty()) {
    return;
  }

  size_t bytes_written = 0;
  MojoResult result =
      upload_pipe_->WriteData(base::as_byte_span(upload_body_),
                              MOJO_WRITE_DATA_FLAG_NONE, bytes_written);

  if (result == MOJO_RESULT_SHOULD_WAIT) {
    // Wait for the pipe to have more capacity available.
    upload_pipe_watcher_->ArmOrNotify();
    return;
  }

  // Depend on |url_loader_| to notice the other pipes being closed on error.
  if (result != MOJO_RESULT_OK)
    return;

  upload_body_.erase(0, bytes_written);

  // If more data is available, arm the watcher again. Don't write again in a
  // loop, even if WriteData would allow it, to avoid blocking the current
  // thread.
  if (!upload_body_.empty()) {
    upload_pipe_watcher_->ArmOrNotify();
  }
}

void ChromiumHttpConnection::OnUploadPipeWriteable(MojoResult unused) {
  SendData();
}

void ChromiumHttpConnection::OnURLLoadComplete(
    std::unique_ptr<std::string> response_body) {
  DCHECK(!handle_partial_response_);

  if (state_ != State::STARTED)
    return;

  state_ = State::COMPLETED;

  if (url_loader_->NetError() != net::OK) {
    delegate_->OnNetworkError(kResponseCodeInvalid,
                              net::ErrorToString(url_loader_->NetError()));
    return;
  }

  int response_code = kResponseCodeInvalid;
  std::string raw_headers;
  if (url_loader_->ResponseInfo() && url_loader_->ResponseInfo()->headers) {
    raw_headers = url_loader_->ResponseInfo()->headers->raw_headers();
    response_code = url_loader_->ResponseInfo()->headers->response_code();
  }

  if (response_code == kResponseCodeInvalid) {
    std::string message = net::ErrorToString(url_loader_->NetError());

    VLOG(3) << "ChromiumHttpConnection completed with network error="
            << response_code << ": " << message;
    delegate_->OnNetworkError(response_code, message);
    return;
  }

  VLOG(3) << "ChromiumHttpConnection completed with response_code="
          << response_code;

  delegate_->OnCompleteResponse(response_code, raw_headers, *response_body);
}

void ChromiumHttpConnection::OnResponseStarted(
    const GURL& final_url,
    const network::mojom::URLResponseHead& response_header) {
  if (enable_header_response_ && response_header.headers) {
    // Only propagate |OnHeaderResponse()| once before any |OnPartialResponse()|
    // invoked to honor the API contract.
    delegate_->OnHeaderResponse(response_header.headers->raw_headers());
  }
}

ChromiumHttpConnectionFactory::ChromiumHttpConnectionFactory(
    std::unique_ptr<PendingSharedURLLoaderFactory> pending_url_loader_factory)
    : url_loader_factory_(SharedURLLoaderFactory::Create(
          std::move(pending_url_loader_factory))) {}

ChromiumHttpConnectionFactory::~ChromiumHttpConnectionFactory() = default;

HttpConnection* ChromiumHttpConnectionFactory::Create(
    HttpConnection::Delegate* delegate) {
  return new ChromiumHttpConnection(url_loader_factory_->Clone(), delegate);
}

}  // namespace ash::libassistant