chromium/components/cronet/android/cronet_bidirectional_stream_adapter.cc

// Copyright 2015 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "cronet_bidirectional_stream_adapter.h"

#include <string>
#include <utility>
#include <vector>

#include "base/functional/bind.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/strings/string_number_conversions.h"
#include "components/cronet/android/cronet_context_adapter.h"
#include "components/cronet/android/io_buffer_with_byte_buffer.h"
#include "components/cronet/android/url_request_close_source.h"
#include "components/cronet/android/url_request_error.h"
#include "components/cronet/metrics_util.h"
#include "net/base/http_user_agent_settings.h"
#include "net/base/net_errors.h"
#include "net/base/request_priority.h"
#include "net/http/bidirectional_stream_request_info.h"
#include "net/http/http_network_session.h"
#include "net/http/http_response_headers.h"
#include "net/http/http_status_code.h"
#include "net/http/http_transaction_factory.h"
#include "net/http/http_util.h"
#include "net/ssl/ssl_info.h"
#include "net/third_party/quiche/src/quiche/quic/core/quic_packets.h"
#include "net/url_request/url_request_context.h"
#include "url/gurl.h"

// Must come after all headers that specialize FromJniType() / ToJniType().
#include "components/cronet/android/cronet_jni_headers/CronetBidirectionalStream_jni.h"

using base::android::ConvertUTF8ToJavaString;
using base::android::ConvertJavaStringToUTF8;
using base::android::JavaRef;
using base::android::ScopedJavaLocalRef;

namespace cronet {

namespace {

// As |GetArrayLength| makes no guarantees about the returned value (e.g., it
// may be -1 if |array| is not a valid Java array), provide a safe wrapper
// that always returns a valid, non-negative size.
template <typename JavaArrayType>
size_t SafeGetArrayLength(JNIEnv* env, JavaArrayType jarray) {
  DCHECK(jarray);
  jsize length = env->GetArrayLength(jarray);
  DCHECK_GE(length, 0) << "Invalid array length: " << length;
  return static_cast<size_t>(std::max(0, length));
}

}  // namespace

PendingWriteData::PendingWriteData(
    JNIEnv* env,
    const JavaRef<jobjectArray>& jwrite_buffer_list,
    const JavaRef<jintArray>& jwrite_buffer_pos_list,
    const JavaRef<jintArray>& jwrite_buffer_limit_list,
    jboolean jwrite_end_of_stream) {
  this->jwrite_buffer_list.Reset(jwrite_buffer_list);
  this->jwrite_buffer_pos_list.Reset(jwrite_buffer_pos_list);
  this->jwrite_buffer_limit_list.Reset(jwrite_buffer_limit_list);
  this->jwrite_end_of_stream = jwrite_end_of_stream;
}

PendingWriteData::~PendingWriteData() {
  // Reset global references.
  jwrite_buffer_list.Reset();
  jwrite_buffer_pos_list.Reset();
  jwrite_buffer_limit_list.Reset();
}

static jlong JNI_CronetBidirectionalStream_CreateBidirectionalStream(
    JNIEnv* env,
    const base::android::JavaParamRef<jobject>& jbidi_stream,
    jlong jurl_request_context_adapter,
    jboolean jsend_request_headers_automatically,
    jboolean jtraffic_stats_tag_set,
    jint jtraffic_stats_tag,
    jboolean jtraffic_stats_uid_set,
    jint jtraffic_stats_uid,
    jlong jnetwork_handle) {
  CronetContextAdapter* context_adapter =
      reinterpret_cast<CronetContextAdapter*>(jurl_request_context_adapter);
  DCHECK(context_adapter);

  CronetBidirectionalStreamAdapter* adapter =
      new CronetBidirectionalStreamAdapter(
          context_adapter, env, jbidi_stream,
          jsend_request_headers_automatically, jtraffic_stats_tag_set,
          jtraffic_stats_tag, jtraffic_stats_uid_set, jtraffic_stats_uid,
          jnetwork_handle);

  return reinterpret_cast<jlong>(adapter);
}

CronetBidirectionalStreamAdapter::CronetBidirectionalStreamAdapter(
    CronetContextAdapter* context,
    JNIEnv* env,
    const base::android::JavaParamRef<jobject>& jbidi_stream,
    bool send_request_headers_automatically,
    bool traffic_stats_tag_set,
    int32_t traffic_stats_tag,
    bool traffic_stats_uid_set,
    int32_t traffic_stats_uid,
    net::handles::NetworkHandle network)
    : context_(context),
      owner_(env, jbidi_stream),
      send_request_headers_automatically_(send_request_headers_automatically),
      traffic_stats_tag_set_(traffic_stats_tag_set),
      traffic_stats_tag_(traffic_stats_tag),
      traffic_stats_uid_set_(traffic_stats_uid_set),
      traffic_stats_uid_(traffic_stats_uid),
      network_(network),
      stream_failed_(false) {}

CronetBidirectionalStreamAdapter::~CronetBidirectionalStreamAdapter() {
  DCHECK(context_->IsOnNetworkThread());
}

void CronetBidirectionalStreamAdapter::SendRequestHeaders(
    JNIEnv* env,
    const base::android::JavaParamRef<jobject>& jcaller) {
  context_->PostTaskToNetworkThread(
      FROM_HERE,
      base::BindOnce(
          &CronetBidirectionalStreamAdapter::SendRequestHeadersOnNetworkThread,
          base::Unretained(this)));
}

jint CronetBidirectionalStreamAdapter::Start(
    JNIEnv* env,
    const base::android::JavaParamRef<jobject>& jcaller,
    const base::android::JavaParamRef<jstring>& jurl,
    jint jpriority,
    const base::android::JavaParamRef<jstring>& jmethod,
    const base::android::JavaParamRef<jobjectArray>& jheaders,
    jboolean jend_of_stream) {
  // Prepare request info here to be able to return the error.
  std::unique_ptr<net::BidirectionalStreamRequestInfo> request_info(
      new net::BidirectionalStreamRequestInfo());
  request_info->url = GURL(ConvertJavaStringToUTF8(env, jurl));
  request_info->priority = static_cast<net::RequestPriority>(jpriority);
  // Http method is a token, just as header name.
  request_info->method = ConvertJavaStringToUTF8(env, jmethod);
  if (!net::HttpUtil::IsValidHeaderName(request_info->method))
    return -1;

  std::vector<std::string> headers;
  base::android::AppendJavaStringArrayToStringVector(env, jheaders, &headers);
  for (size_t i = 0; i < headers.size(); i += 2) {
    std::string name(headers[i]);
    std::string value(headers[i + 1]);
    if (!net::HttpUtil::IsValidHeaderName(name) ||
        !net::HttpUtil::IsValidHeaderValue(value)) {
      return i + 1;
    }
    request_info->extra_headers.SetHeader(name, value);
  }
  request_info->end_stream_on_headers = jend_of_stream;
  if (traffic_stats_tag_set_ || traffic_stats_uid_set_) {
    request_info->socket_tag = net::SocketTag(
        traffic_stats_uid_set_ ? traffic_stats_uid_ : net::SocketTag::UNSET_UID,
        traffic_stats_tag_set_ ? traffic_stats_tag_
                               : net::SocketTag::UNSET_TAG);
  }

  context_->PostTaskToNetworkThread(
      FROM_HERE,
      base::BindOnce(&CronetBidirectionalStreamAdapter::StartOnNetworkThread,
                     base::Unretained(this), std::move(request_info)));
  return 0;
}

jboolean CronetBidirectionalStreamAdapter::ReadData(
    JNIEnv* env,
    const base::android::JavaParamRef<jobject>& jcaller,
    const base::android::JavaParamRef<jobject>& jbyte_buffer,
    jint jposition,
    jint jlimit) {
  DCHECK_LT(jposition, jlimit);

  scoped_refptr<IOBufferWithByteBuffer> read_buffer(
      new IOBufferWithByteBuffer(env, jbyte_buffer, jposition, jlimit));

  int remaining_capacity = jlimit - jposition;

  context_->PostTaskToNetworkThread(
      FROM_HERE,
      base::BindOnce(&CronetBidirectionalStreamAdapter::ReadDataOnNetworkThread,
                     base::Unretained(this), read_buffer, remaining_capacity));
  return JNI_TRUE;
}

jboolean CronetBidirectionalStreamAdapter::WritevData(
    JNIEnv* env,
    const base::android::JavaParamRef<jobject>& jcaller,
    const base::android::JavaParamRef<jobjectArray>& jbyte_buffers,
    const base::android::JavaParamRef<jintArray>& jbyte_buffers_pos,
    const base::android::JavaParamRef<jintArray>& jbyte_buffers_limit,
    jboolean jend_of_stream) {
  size_t buffers_array_size = SafeGetArrayLength(env, jbyte_buffers.obj());
  size_t pos_array_size = SafeGetArrayLength(env, jbyte_buffers.obj());
  size_t limit_array_size = SafeGetArrayLength(env, jbyte_buffers.obj());
  if (buffers_array_size != pos_array_size ||
      buffers_array_size != limit_array_size) {
    DLOG(ERROR) << "Illegal arguments.";
    return JNI_FALSE;
  }

  std::unique_ptr<PendingWriteData> pending_write_data;
  pending_write_data.reset(
      new PendingWriteData(env, jbyte_buffers, jbyte_buffers_pos,
                           jbyte_buffers_limit, jend_of_stream));
  for (size_t i = 0; i < buffers_array_size; ++i) {
    ScopedJavaLocalRef<jobject> jbuffer(
        env, env->GetObjectArrayElement(
                 pending_write_data->jwrite_buffer_list.obj(), i));
    void* data = env->GetDirectBufferAddress(jbuffer.obj());
    if (!data)
      return JNI_FALSE;
    jint pos;
    env->GetIntArrayRegion(pending_write_data->jwrite_buffer_pos_list.obj(), i,
                           1, &pos);
    jint limit;
    env->GetIntArrayRegion(pending_write_data->jwrite_buffer_limit_list.obj(),
                           i, 1, &limit);
    auto write_buffer = base::MakeRefCounted<net::WrappedIOBuffer>(
        base::make_span(static_cast<char*>(data), static_cast<size_t>(limit))
            .subspan(pos));
    pending_write_data->write_buffer_list.push_back(write_buffer);
    pending_write_data->write_buffer_len_list.push_back(write_buffer->size());
  }

  context_->PostTaskToNetworkThread(
      FROM_HERE,
      base::BindOnce(
          &CronetBidirectionalStreamAdapter::WritevDataOnNetworkThread,
          base::Unretained(this), std::move(pending_write_data)));
  return JNI_TRUE;
}

void CronetBidirectionalStreamAdapter::Destroy(
    JNIEnv* env,
    const base::android::JavaParamRef<jobject>& jcaller,
    jboolean jsend_on_canceled) {
  // Destroy could be called from any thread, including network thread (if
  // posting task to executor throws an exception), but is posted, so |this|
  // is valid until calling task is complete. Destroy() is always called from
  // within a synchronized java block that guarantees no future posts to the
  // network thread with the adapter pointer.
  context_->PostTaskToNetworkThread(
      FROM_HERE,
      base::BindOnce(&CronetBidirectionalStreamAdapter::DestroyOnNetworkThread,
                     base::Unretained(this), jsend_on_canceled));
}

void CronetBidirectionalStreamAdapter::OnStreamReady(
    bool request_headers_sent) {
  DCHECK(context_->IsOnNetworkThread());
  JNIEnv* env = base::android::AttachCurrentThread();
  cronet::Java_CronetBidirectionalStream_onStreamReady(
      env, owner_, request_headers_sent ? JNI_TRUE : JNI_FALSE);
}

void CronetBidirectionalStreamAdapter::OnHeadersReceived(
    const quiche::HttpHeaderBlock& response_headers) {
  DCHECK(context_->IsOnNetworkThread());
  JNIEnv* env = base::android::AttachCurrentThread();
  // Get http status code from response headers.
  jint http_status_code = 0;
  const auto http_status_header = response_headers.find(":status");
  if (http_status_header != response_headers.end()) {
    base::StringToInt(http_status_header->second, &http_status_code);
  }

  std::string protocol;
  switch (bidi_stream_->GetProtocol()) {
    case net::kProtoHTTP2:
      protocol = "h2";
      break;
    case net::kProtoQUIC:
      protocol = "quic/1+spdy/3";
      break;
    default:
      break;
  }

  cronet::Java_CronetBidirectionalStream_onResponseHeadersReceived(
      env, owner_, http_status_code, ConvertUTF8ToJavaString(env, protocol),
      GetHeadersArray(env, response_headers),
      bidi_stream_->GetTotalReceivedBytes());
}

void CronetBidirectionalStreamAdapter::OnDataRead(int bytes_read) {
  DCHECK(context_->IsOnNetworkThread());
  JNIEnv* env = base::android::AttachCurrentThread();
  cronet::Java_CronetBidirectionalStream_onReadCompleted(
      env, owner_, read_buffer_->byte_buffer(), bytes_read,
      read_buffer_->initial_position(), read_buffer_->initial_limit(),
      bidi_stream_->GetTotalReceivedBytes());
  // Free the read buffer. This lets the Java ByteBuffer be freed, if the
  // embedder releases it, too.
  read_buffer_ = nullptr;
}

void CronetBidirectionalStreamAdapter::OnDataSent() {
  DCHECK(context_->IsOnNetworkThread());
  DCHECK(pending_write_data_);

  JNIEnv* env = base::android::AttachCurrentThread();
  // Call into Java.
  cronet::Java_CronetBidirectionalStream_onWritevCompleted(
      env, owner_, pending_write_data_->jwrite_buffer_list,
      pending_write_data_->jwrite_buffer_pos_list,
      pending_write_data_->jwrite_buffer_limit_list,
      pending_write_data_->jwrite_end_of_stream);
  // Free the java objects. This lets the Java ByteBuffers be freed, if the
  // embedder releases it, too.
  pending_write_data_.reset();
}

void CronetBidirectionalStreamAdapter::OnTrailersReceived(
    const quiche::HttpHeaderBlock& response_trailers) {
  DCHECK(context_->IsOnNetworkThread());
  JNIEnv* env = base::android::AttachCurrentThread();
  cronet::Java_CronetBidirectionalStream_onResponseTrailersReceived(
      env, owner_, GetHeadersArray(env, response_trailers));
}

void CronetBidirectionalStreamAdapter::OnFailed(int error) {
  DCHECK(context_->IsOnNetworkThread());
  stream_failed_ = true;
  JNIEnv* env = base::android::AttachCurrentThread();
  net::NetErrorDetails net_error_details;
  bidi_stream_->PopulateNetErrorDetails(&net_error_details);
  cronet::Java_CronetBidirectionalStream_onError(
      env, owner_, NetErrorToUrlRequestError(error), error,
      net_error_details.quic_connection_error,
      (int)NetSourceToJavaSource(net_error_details.source),
      ConvertUTF8ToJavaString(env, net::ErrorToString(error)),
      bidi_stream_->GetTotalReceivedBytes());
}

void CronetBidirectionalStreamAdapter::StartOnNetworkThread(
    std::unique_ptr<net::BidirectionalStreamRequestInfo> request_info) {
  DCHECK(context_->IsOnNetworkThread());
  DCHECK(!bidi_stream_);

  request_info->detect_broken_connection =
      context_->cronet_url_request_context()
          ->bidi_stream_detect_broken_connection();
  request_info->heartbeat_interval =
      context_->cronet_url_request_context()->heartbeat_interval();
  request_info->extra_headers.SetHeaderIfMissing(
      net::HttpRequestHeaders::kUserAgent,
      context_->GetURLRequestContext(network_)
          ->http_user_agent_settings()
          ->GetUserAgent());
  bidi_stream_.reset(
      new net::BidirectionalStream(std::move(request_info),
                                   context_->GetURLRequestContext(network_)
                                       ->http_transaction_factory()
                                       ->GetSession(),
                                   send_request_headers_automatically_, this));
}

void CronetBidirectionalStreamAdapter::SendRequestHeadersOnNetworkThread() {
  DCHECK(context_->IsOnNetworkThread());
  DCHECK(!send_request_headers_automatically_);

  if (stream_failed_) {
    // If stream failed between the time when SendRequestHeaders is invoked and
    // SendRequestHeadersOnNetworkThread is executed, do not call into
    // |bidi_stream_| since the underlying stream might have been destroyed.
    // Do not invoke Java callback either, since onError is posted when
    // |stream_failed_| is set to true.
    return;
  }
  bidi_stream_->SendRequestHeaders();
}

void CronetBidirectionalStreamAdapter::ReadDataOnNetworkThread(
    scoped_refptr<IOBufferWithByteBuffer> read_buffer,
    int buffer_size) {
  DCHECK(context_->IsOnNetworkThread());
  DCHECK(read_buffer);
  DCHECK(!read_buffer_);

  read_buffer_ = read_buffer;

  int bytes_read = bidi_stream_->ReadData(read_buffer_.get(), buffer_size);
  // If IO is pending, wait for the BidirectionalStream to call OnDataRead.
  if (bytes_read == net::ERR_IO_PENDING)
    return;

  if (bytes_read < 0) {
    OnFailed(bytes_read);
    return;
  }
  OnDataRead(bytes_read);
}

void CronetBidirectionalStreamAdapter::WritevDataOnNetworkThread(
    std::unique_ptr<PendingWriteData> pending_write_data) {
  DCHECK(context_->IsOnNetworkThread());
  DCHECK(pending_write_data);
  DCHECK(!pending_write_data_);

  if (stream_failed_) {
    // If stream failed between the time when WritevData is invoked and
    // WritevDataOnNetworkThread is executed, do not call into |bidi_stream_|
    // since the underlying stream might have been destroyed. Do not invoke
    // Java callback either, since onError is posted when |stream_failed_| is
    // set to true.
    return;
  }

  pending_write_data_ = std::move(pending_write_data);
  bool end_of_stream = pending_write_data_->jwrite_end_of_stream == JNI_TRUE;
  bidi_stream_->SendvData(pending_write_data_->write_buffer_list,
                          pending_write_data_->write_buffer_len_list,
                          end_of_stream);
}

void CronetBidirectionalStreamAdapter::DestroyOnNetworkThread(
    bool send_on_canceled) {
  DCHECK(context_->IsOnNetworkThread());
  if (send_on_canceled) {
    JNIEnv* env = base::android::AttachCurrentThread();
    cronet::Java_CronetBidirectionalStream_onCanceled(env, owner_);
  }
  MaybeReportMetrics();
  delete this;
}

base::android::ScopedJavaLocalRef<jobjectArray>
CronetBidirectionalStreamAdapter::GetHeadersArray(
    JNIEnv* env,
    const quiche::HttpHeaderBlock& header_block) {
  DCHECK(context_->IsOnNetworkThread());

  std::vector<std::string> headers;
  for (const auto& header : header_block) {
    auto value = std::string(header.second);
    size_t start = 0;
    size_t end = 0;
    // The do loop will split headers by '\0' so that applications can skip it.
    do {
      end = value.find('\0', start);
      std::string split_value;
      if (end != value.npos) {
        split_value = value.substr(start, end - start);
      } else {
        split_value = value.substr(start);
      }
      headers.push_back(std::string(header.first));
      headers.push_back(split_value);
      start = end + 1;
    } while (end != value.npos);
  }
  return base::android::ToJavaArrayOfStrings(env, headers);
}

void CronetBidirectionalStreamAdapter::MaybeReportMetrics() {
  if (!bidi_stream_)
    return;
  net::LoadTimingInfo load_timing_info;
  bidi_stream_->GetLoadTimingInfo(&load_timing_info);
  JNIEnv* env = base::android::AttachCurrentThread();
  base::Time start_time = load_timing_info.request_start_time;
  base::TimeTicks start_ticks = load_timing_info.request_start;
  net::NetErrorDetails net_error_details;
  bidi_stream_->PopulateNetErrorDetails(&net_error_details);
  cronet::Java_CronetBidirectionalStream_onMetricsCollected(
      env, owner_,
      metrics_util::ConvertTime(start_ticks, start_ticks, start_time),
      metrics_util::ConvertTime(
          load_timing_info.connect_timing.domain_lookup_start, start_ticks,
          start_time),
      metrics_util::ConvertTime(
          load_timing_info.connect_timing.domain_lookup_end, start_ticks,
          start_time),
      metrics_util::ConvertTime(load_timing_info.connect_timing.connect_start,
                                start_ticks, start_time),
      metrics_util::ConvertTime(load_timing_info.connect_timing.connect_end,
                                start_ticks, start_time),
      metrics_util::ConvertTime(load_timing_info.connect_timing.ssl_start,
                                start_ticks, start_time),
      metrics_util::ConvertTime(load_timing_info.connect_timing.ssl_end,
                                start_ticks, start_time),
      metrics_util::ConvertTime(load_timing_info.send_start, start_ticks,
                                start_time),
      metrics_util::ConvertTime(load_timing_info.send_end, start_ticks,
                                start_time),
      metrics_util::ConvertTime(load_timing_info.push_start, start_ticks,
                                start_time),
      metrics_util::ConvertTime(load_timing_info.push_end, start_ticks,
                                start_time),
      metrics_util::ConvertTime(load_timing_info.receive_headers_end,
                                start_ticks, start_time),
      metrics_util::ConvertTime(base::TimeTicks::Now(), start_ticks,
                                start_time),
      load_timing_info.socket_reused, bidi_stream_->GetTotalSentBytes(),
      bidi_stream_->GetTotalReceivedBytes(),
      net_error_details.quic_connection_migration_attempted ? JNI_TRUE
                                                            : JNI_FALSE,
      net_error_details.quic_connection_migration_successful ? JNI_TRUE
                                                             : JNI_FALSE);
}

}  // namespace cronet