chromium/chrome/test/enterprise/e2e/connector/common/histogram/util.py

# Copyright 2023 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import json
import socket
from typing import Dict, List, Optional, Set

from absl import logging
from attr import asdict
from attr import define
from attr import Factory
from selenium import webdriver


@define
class HistogramDataBucket:
  """Maps to the json:"buckets" values inside HistogramData.

  This class allows for the parsing of the "buckets" json response when using
  histogram collection in Chrome. For more information on histogram collection,
  see http://shortn/_Oar7S8pRLF.

  Attributes:
    count: The "count" value in the nested "buckets" json object of the json
      response. Defaults to None.
    high: The "high" value in the nested "buckets" json object of the json
      response. Defaults to None.
    low: The "low" value in the nested "buckets" json object of the json
      response. Defaults to None.
  """
  count: int = None
  high: int = None
  low: int = None

  def __bool__(self):
    return bool(self.count or self.high or self.low)


@define
class HistogramDataParams:
  """Maps to the json:"params" inside HistogramData.

  This class allows for the parsing of the "params" within the json response
  provided when using histogram collection in Chrome. For more information on
  histogram collection, see http://shortn/_Oar7S8pRLF.

  Attributes:
    bucket_count: The "bucket_count" value in the nested "params" json of the
      json response. Defaults to None.
    max_value: The "max" value in the nested "params" json of the json response.
      Defaults to None.
    min_value: The "min" value in the nested "params" json of the json response.
      Defaults to None.
    data_type: The "type" value in the nested "params" json of the json
      response. Defaults to None.
  """
  bucket_count: float = None
  max_value: float = None
  min_value: float = None
  data_type: str = None

  def __bool__(self):
    return bool(self.bucket_count or self.max_value or self.min_value or
                self.data_type)


@define
class HistogramData:
  """Maps to the json string schema from histogram.

  This class allows for the parsing of the json response provided when using
  histogram collection in Chrome. For more information on histogram collection,
  see http://shortn/_Oar7S8pRLF. All values default to None in the case of a
  partial json response.

  Attributes:
    buckets: The "buckets" value of the json response. Defaults to None.
    count: The "count" value of the json response. Defaults to None.
    flags: The "flags" value of the json response. Defaults to None.
    name: The "name" value of the json response. Defaults to None.
    params: The "params" value of the json response. Defaults to None.
    pid: The "pid" value of the json response. Defaults to None.
    sum_value: The "sum" value of the json response. Defaults to None.
  """
  buckets: List[HistogramDataBucket] = Factory(list)
  count: float = 0.0
  flags: float = None
  name: str = None
  params: HistogramDataParams = None
  pid: float = None
  sum_value: float = None
  values: Set[int] = Factory(set)

  def isValuePresent(self, value: int) -> bool:
    return value in self.values


def parse(response: str) -> HistogramData:
  """Parses a Chrome histogram response json string into a HistogramData object.

  Args:
    response: A string representation for the json response from a call to
      statsCollectionController.getBrowserHistogram.

  Returns:
    A HistogramData object represented the json response.

  Raises:
    BaseException: If the json response isn't able to be parsed into a
     HistogramData object.
  """
  d = json.loads(response)
  buckets_list = d["buckets"] if "buckets" in d else []
  buckets = [
      HistogramDataBucket(b["count"], b["high"], b["low"]) for b in buckets_list
  ] if buckets_list else None
  count = d["count"] if "count" in d else None
  flags = d["flags"] if "flags" in d else None
  name = d["name"] if "name" in d else None
  p = d["params"] if "params" in d else None
  params = HistogramDataParams(
      p["bucket_count"] if "bucket_count" in p else 0.0,
      p["max"] if "max" in p else -1, p["min"] if "min" in p else -1,
      p["type"] if "type" in p else None) if p else None
  pid = d["pid"] if "pid" in d else None
  s = d["sum"] if "sum" in d else None
  return HistogramData(buckets, count, flags, name, params, pid, s)


def add(val1: Optional[float], val2: Optional[float]) -> Optional[float]:
  if not val1 or not val2:
    return val2 if not val1 else val1
  return val1 + val2


def to_int(val: Optional[float]) -> int:
  """Returns a rounded up integer from a byte string."""
  if not val:
    return 0
  return int(float(str(val)))


def merge_histograms(hd1: Optional[HistogramData],
                     hd2: Optional[HistogramData]) -> Optional[HistogramData]:
  """Merges two histograms by summing corresponding values.

  Args:
    hd1: A HistogramData object to be merged.
    hd2: A HistogramData object to be merged.

  Returns:
    A HistogramData object in which all corresponding values are added.
  """
  if not hd1 or not hd2:
    return hd2 if not hd1 else hd1

  logging.info(f'hd1={hd1}, hd2={hd2}')
  hd1.count = add(hd1.count, hd2.count)
  hd1.sum_value = add(hd1.sum_value, hd2.sum_value)
  # Add both sum_value to values to allow query for present query
  # sum_value is byte format, and it is like b'9.0'
  try:
    hd1.values.add(to_int(hd1.sum_value))
    hd1.values.add(to_int(hd2.sum_value))
  except e:
    logging.warning(
        f'fail to add {hd1.sum_value}, {hd2.sum_value}. err: {str(e)}')

  if not hd2.buckets:
    return hd1

  for b2 in hd2.buckets:
    found = False
    for i, b1 in enumerate(hd1.buckets):
      if b2.low == b1.low and b2.high == b1.high:
        hd1.buckets[i].count += b2.count
        found = True

    if not found:
      hd1.buckets.append(b2)

  return hd1


def histogram(cd: webdriver.chrome.webdriver.WebDriver,
              name: str,
              timeout: int = 30) -> Optional[HistogramData]:
  """Queries a particular histogram from Chrome and formats the response.

  The ChromeDriver instance provided is used to execute javascript in browser,
  generating a histogram as a json response. The response is then parsed
  and processed, ultimately returned as a HistogramData object. For more
  information on histogram collection, see http://shortn/_Oar7S8pRLF.

  Args:
    cd: A selenium.webdriver.chrome.webdriver.WebDriver instance.
    name: The name of the histogram to search for.
    timeout: The timeout for the script execution in seconds. Defaults to 30.

  Returns:
    A HistogramData object parsed from the json response.

  Raises:
    BaseException: When either histogram request returns an empty response.
  """
  resp_str = execute_script(
      cd, "return statsCollectionController.getBrowserHistogram('%s');" % name,
      timeout)
  if not resp_str:
    raise BaseException("Could not get histogram with %s" % name)

  hd1 = parse(resp_str)

  # Fetch renderer histograms as well
  resp_str = execute_script(
      cd, "return statsCollectionController.getHistogram('%s');" % name)
  if not resp_str:
    raise ValueError("Could not get second histogram with %s" % name)
  hd2 = parse(resp_str)
  return merge_histograms(hd1, hd2)


def execute_script(cd: webdriver.chrome.webdriver.WebDriver,
                   script: str,
                   timeout: int = 30) -> str:
  """Executes javascript in browser through ChromeDriver.

  Args:
    cd: A selenium.webdriver.chrome.webdriver.WebDriver instance.
    script: The script to run in the form of a string.
    timeout: The timeout to update the socket to. Defaults to 30 seconds to
      ensure adequate time for the javascript response.

  Returns:
    A str of the response data from executing the script.
  """
  default_timeout = socket.getdefaulttimeout()
  socket.setdefaulttimeout(timeout)
  logging.log(logging.DEBUG, "Set socket timeout to %s seconds", timeout)

  script_result = cd.execute_script(script)
  logging.log(logging.DEBUG, "Executed Javascript in browser: %s", script)

  socket.setdefaulttimeout(default_timeout)
  logging.log(logging.DEBUG, "Set socket timeout to default: %s",
              default_timeout)
  return script_result


def poll_histogram(cd: webdriver.chrome.webdriver.WebDriver,
                   names: List[str],
                   timeout: int = 30) -> Optional[Dict[str, HistogramData]]:
  """Polls the histograms and returns the list of non-empty histogram.

  Args:
    cd: A selenium.webdriver.chrome.webdriver.WebDriver instance.
    names: The names of the histograms to fetch.
    timeout: The timeout for each histogram fetch request in seconds. Defaults
      to 30.

  Returns:
    A dict of HistogramData object of the matched histogram.
  """
  hgs = None
  for name in names:
    hg = histogram(cd, name, timeout)
    if not hgs:
      hgs = {}
    hgs[name] = asdict(hg)

  return hgs