chromium/tools/traffic_annotation/scripts/auditor/util.py

# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging
import platform
import re
import sys

from functools import reduce
from itertools import chain
from google.protobuf import text_format
from google.protobuf.descriptor import FieldDescriptor
from google.protobuf.message import Message
from pathlib import Path
from typing import NewType, Any, Optional, List, Iterable

UniqueId = NewType("UniqueId", str)
HashCode = NewType("HashCode", int)

# Configure logging with timestamp, log level, filename, and line number.
logging.basicConfig(
    level=logging.INFO,
    format="[%(asctime)s:%(levelname)s:%(filename)s(%(lineno)d)] %(message)s")
logger = logging.getLogger(__name__)


def import_compiled_proto(build_path) -> Any:
  """Global import from function. |self.build_path| is needed to perform
  this import, hence why it's not a top-level import.

  The compiled proto is located ${build_path}/pyproto/ and generated as a part
  of compiling Chrome."""
  # Use the build path to import the compiled traffic annotation proto.
  proto_path = build_path / "pyproto" / "chrome" / "browser" / "privacy"
  sys.path.insert(0, str(proto_path))

  try:
    global traffic_annotation_pb2
    global traffic_annotation
    import traffic_annotation_pb2
    # Used for accessing enum constants.
    from traffic_annotation_pb2 import NetworkTrafficAnnotation as \
      traffic_annotation
    return traffic_annotation_pb2
  except ImportError as e:
    logger.critical(
      "Failed to import the compiled traffic annotation proto. Make sure "
      "you're on Linux or Windows and Chrome is built in '{}' before "
      "running this script.".format(build_path))
    raise


def get_current_platform(build_path: Optional[Path] = None) -> str:
  """Return the target platform of |build_path| based on heuristics."""
  # Use host platform as the source of truth (in most cases).
  current_platform: str = platform.system().lower()

  if current_platform == "linux" and build_path is not None:
    # It could be an Android build directory, being compiled from a Linux host.
    # Look for a target_os="android" line in args.gn.
    try:
      gn_args = (build_path / "args.gn").read_text(encoding="utf-8")
      pattern = re.compile(r"^\s*target_os\s*=\s*\"(android|chromeos)\"\s*$",
                           re.MULTILINE)
      match = pattern.search(gn_args)
      if match:
        current_platform = match.group(1)

    except (ValueError, OSError) as e:
      logger.info(e)
      # Maybe the file's absent, or it can't be decoded as UTF-8, or something.
      # It's probably not Android/ChromeOS in that case.
      pass

  return current_platform


def twos_complement_8bit(b: int) -> int:
  """Interprets b like a signed 8-bit integer, possibly changing its sign.

  For instance, twos_complement_8bit(204) returns -52."""
  if b >= 256:
    raise ValueError("b must fit inside 8 bits")
  if b & (1 << 7):
    # Negative number, calculate its value using two's-complement.
    return b - (1 << 8)
  else:
    # Positive number, do not touch.
    return b


def iterative_hash(s: str) -> HashCode:
  """Compute the has code of the given string as in:
  net/traffic_annotation/network_traffic_annotation.h

  Args:
    s: str
      The seed, e.g. unique id of traffic annotation.
  Returns: int
    A hash code.
  """
  return HashCode(
      reduce(lambda acc, b: (acc * 31 + twos_complement_8bit(b)) % 138003713,
             s.encode("utf-8"), 0))


def compute_hash_value(text: str) -> HashCode:
  """Same as iterative_hash, but returns -1 for empty strings."""
  return iterative_hash(text) if text else HashCode(-1)


def merge_string_field(src: Message, dst: Message, field: str):
  """Merges the content of one string field into an annotation."""
  if getattr(src, field):
    if getattr(dst, field):
      setattr(dst, field, "{}\n{}".format(getattr(src, field),
                                          getattr(dst, field)))
    else:
      setattr(dst, field, getattr(src, field))


def fill_proto_with_bogus(unique_id: str, proto: Message,
                          field_numbers: List[int]):
  """Fill proto with bogus values for the fields identified by field_numbers.
  Uses reflection to fill the proto with the right types."""
  descriptor = proto.DESCRIPTOR
  for field_number in field_numbers:
    field_number = abs(field_number)

    if field_number not in descriptor.fields_by_number:
      raise ValueError("{} is not a valid {} field".format(
          field_number, descriptor.name))

    field = descriptor.fields_by_number[field_number]
    repeated = field.label == FieldDescriptor.LABEL_REPEATED

    if field.type == FieldDescriptor.TYPE_STRING and not repeated:
      setattr(proto, field.name, "[Archived]")
    elif field.type == FieldDescriptor.TYPE_ENUM and not repeated:
      # Assume the 2nd value in the enum is reasonable, since the 1st is
      # UNSPECIFIED.
      setattr(proto, field.name, field.enum_type.values[1].number)
    elif field.type == FieldDescriptor.TYPE_MESSAGE and repeated:
      getattr(proto, field.name).add()
    elif field.type == FieldDescriptor.TYPE_MESSAGE:
      # Non-repeated message, nothing to do.
      pass
    else:
      raise NotImplementedError(
          "Unimplemented proto field {} of type {} ({}) in {}".format(
              field.name, field.type,
              "repeated" if repeated else "non-repeated", unique_id))


def extract_annotation_id(line: str) -> Optional[UniqueId]:
  """Returns the annotation id given an '<item id=...' line"""
  m = re.search('id="([^"]+)"', line)
  return UniqueId(m.group(1)) if m else None


def escape_for_tsv(text: str) -> str:
  """Changes double-quotes to single-quotes, and adds double-quotes around the
  text if it has newlines/tabs."""
  text.replace("\"", "'")
  if "\n" in text or "\t" in text:
    return "\"{}\"".format(text)
  return text


def policy_to_text(chrome_policy: Iterable[Message]) -> str:
  """Unnests the policy name/values from chrome_policy, producing a
  human-readable string.

  For example, this:
    chrome_policy {
      SyncDisabled {
        policy_options {
          mode: MANDATORY
        }
        SyncDisabled: true
      }
    }

  becomes this:
    SyncDisabled: true"""
  items = []
  # Use the protobuf serializer library to print the fields, 2 levels deep.
  for policy in chrome_policy:
    for field, value in policy.ListFields():
      for subfield, subvalue in value.ListFields():
        if subfield.name == "policy_options":
          # Skip the policy_options field.
          continue
        writer = text_format.TextWriter(as_utf8=True)
        if subfield.label == FieldDescriptor.LABEL_REPEATED:
          # text_format.PrintField needs repeated fields passed in
          # one-at-a-time.
          for repeated_value in subvalue:
            text_format.PrintField(subfield,
                                   repeated_value,
                                   writer,
                                   as_one_line=True,
                                   use_short_repeated_primitives=True)
        else:
          text_format.PrintField(subfield,
                                 subvalue,
                                 writer,
                                 as_one_line=True,
                                 use_short_repeated_primitives=True)
        items.append(writer.getvalue().strip())
  # We wrote an extra comma at the end, remove it before returning.
  return ", ".join(items)
  return re.sub(r", $", "", writer.getvalue()).strip()


def write_annotations_tsv_file(file_path: Path, annotations: List["Annotation"],
                               missing_ids: List[UniqueId]):
  """Writes a TSV file of all annotations and their contents in file_path."""
  logger.info("Saving annotations to TSV file: {}.".format(file_path))
  Destination = traffic_annotation.TrafficSemantics.Destination
  CookiesAllowed = traffic_annotation.TrafficPolicy.CookiesAllowed

  lines = []
  title = "Unique ID\tLast Update\tSender\tDescription\tTrigger\tData\t" + \
  "Destination\tCookies Allowed\tCookies Store\tSetting\tChrome Policy\t" + \
  "Comments\tSource File"

  column_count = title.count("\t")
  for missing_id in missing_ids:
    lines.append(missing_id + "\t" * column_count)

  for annotation in annotations:
    if annotation.type.value != "definition":
      continue

    # TODO(nicolaso): Use StringIO for faster concatenation.

    line = annotation.proto.unique_id
    # Placeholder for Last Update Date, will be updated in the scripts.
    line += "\t"

    # Semantics.
    semantics = annotation.proto.semantics
    semantics_list = [
        semantics.sender,
        escape_for_tsv(semantics.description),
        escape_for_tsv(semantics.trigger),
        escape_for_tsv(semantics.data),
    ]

    for semantic_info in semantics_list:
      line += "\t{}".format(semantic_info)

    destination_names = {
        Destination.WEBSITE: "Website",
        Destination.GOOGLE_OWNED_SERVICE: "Google",
        Destination.LOCAL: "Local",
        Destination.PROXIED_GOOGLE_OWNED_SERVICE: "Proxied to Google",
        Destination.OTHER: "Other",
    }
    if (semantics.destination == Destination.OTHER
        and semantics.destination_other):
      line += "\tOther: {}".format(semantics.destination_other)
    elif semantics.destination in destination_names:
      line += "\t{}".format(destination_names[semantics.destination])
    else:
      raise ValueError(
          "Invalid value for the semantics.destination field: {}".format(
              semantics.destination))

    # Policy.
    policy = annotation.proto.policy
    if annotation.proto.policy.cookies_allowed == CookiesAllowed.YES:
      line += "\tYes"
    else:
      line += "\tNo"

    line += "\t{}".format(escape_for_tsv(policy.cookies_store))
    line += "\t{}".format(escape_for_tsv(policy.setting))

    # Chrome policies.
    if annotation.has_policy():
      policies_text = policy_to_text(
          chain(policy.chrome_policy, policy.chrome_device_policy))
    else:
      policies_text = policy.policy_exception_justification
    line += "\t{}".format(escape_for_tsv(policies_text))

    # Comments.
    line += "\t{}".format(escape_for_tsv(annotation.proto.comments))
    # Source.
    source = annotation.proto.source
    code_search_link = "https://cs.chromium.org/chromium/src/"
    line += "\t{}{}?l={}".format(code_search_link, source.file, source.line)
    lines.append(line)

  lines.sort()
  lines.insert(0, title)
  report = "\n".join(lines) + "\n"

  file_path.write_text(report, encoding="utf-8")