chromium/tools/metrics/ukm/ukm_model.py

# Copyright 2024 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# """Model objects for ukm.xml contents."""

import os
import sys

sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'common'))
import models
import model_shared


_ENUMERATION_TYPE = models.ObjectNodeType(
    'enumeration',
    attributes=[],
    single_line=True)

_QUANTILES_TYPE = models.ObjectNodeType(
    'quantiles',
    attributes=[
      ('type', str, None),
    ],
    single_line=True)

_INDEX_TYPE = models.ObjectNodeType(
    'index',
    attributes=[
      ('fields', str, None),
    ],
    single_line=True)

_STATISTICS_TYPE = models.ObjectNodeType(
    'statistics',
    attributes=[
        ('export', str, r'(?i)^(|true|false)$'),
    ],
    children=[
        models.ChildType(_QUANTILES_TYPE.tag, _QUANTILES_TYPE, multiple=False),
        models.ChildType(_ENUMERATION_TYPE.tag,
                         _ENUMERATION_TYPE,
                         multiple=False),
    ])

_HISTORY_TYPE =  models.ObjectNodeType(
    'history',
    attributes=[],
    alphabetization=[
        (_INDEX_TYPE.tag, model_shared._LOWERCASE_FN('fields')),
        (_STATISTICS_TYPE.tag, model_shared._KEEP_ORDER),
    ],
    children=[
        models.ChildType(_INDEX_TYPE.tag, _INDEX_TYPE, multiple=True),
        models.ChildType(_STATISTICS_TYPE.tag, _STATISTICS_TYPE, multiple=True),
    ])

_AGGREGATION_TYPE =  models.ObjectNodeType(
    'aggregation',
    attributes=[],
    children=[
        models.ChildType(_HISTORY_TYPE.tag, _HISTORY_TYPE, multiple=False),
    ])

_METRIC_TYPE =  models.ObjectNodeType(
    'metric',
    attributes=[
      ('name', str, r'^[A-Za-z][A-Za-z0-9_.]*$'),
      ('semantic_type', str, None),
      ('enum', str, None),
    ],
    alphabetization=[
        (model_shared._OBSOLETE_TYPE.tag, model_shared._KEEP_ORDER),
        (model_shared._OWNER_TYPE.tag, model_shared._KEEP_ORDER),
        (model_shared._SUMMARY_TYPE.tag, model_shared._KEEP_ORDER),
        (_AGGREGATION_TYPE.tag, model_shared._KEEP_ORDER),
    ],
    children=[
        models.ChildType(
          model_shared._OBSOLETE_TYPE.tag, model_shared._OBSOLETE_TYPE, \
            multiple=False),
        models.ChildType(
          model_shared._OWNER_TYPE.tag, model_shared._OWNER_TYPE, \
            multiple=True),
        models.ChildType(
          model_shared._SUMMARY_TYPE.tag, model_shared._SUMMARY_TYPE, \
            multiple=False),
        models.ChildType(
          _AGGREGATION_TYPE.tag, _AGGREGATION_TYPE, \
            multiple=True),
    ])

_EVENT_TYPE = models.ObjectNodeType(
    'event',
    attributes=[
        ('name', str, r'^[A-Za-z][A-Za-z0-9.]*$'),
        ('singular', str, r'(?i)^(|true|false)$'),
        # This event will be omitted from the generated readable_event.proto
        # file if skip_proto_reason is a non-empty string.
        ('skip_proto_reason', str, None),
    ],
    alphabetization=[
        (model_shared._OBSOLETE_TYPE.tag, model_shared._KEEP_ORDER),
        (model_shared._OWNER_TYPE.tag, model_shared._KEEP_ORDER),
        (model_shared._SUMMARY_TYPE.tag, model_shared._KEEP_ORDER),
        (_METRIC_TYPE.tag, model_shared._LOWERCASE_FN('name')),
    ],
    extra_newlines=(1, 1, 1),
    children=[
        models.ChildType(
          model_shared._OBSOLETE_TYPE.tag, model_shared._OBSOLETE_TYPE, \
            multiple=False),
        models.ChildType(
          model_shared._OWNER_TYPE.tag, model_shared._OWNER_TYPE, \
            multiple=True),
        models.ChildType(
          model_shared._SUMMARY_TYPE.tag, model_shared._SUMMARY_TYPE, \
            multiple=False),
        models.ChildType(
          _METRIC_TYPE.tag, _METRIC_TYPE, \
            multiple=True),
    ])

_UKM_CONFIGURATION_TYPE = models.ObjectNodeType(
    'ukm-configuration',
    alphabetization=[(_EVENT_TYPE.tag, model_shared._LOWERCASE_FN('name'))],
    extra_newlines=(2, 1, 1),
    indent=False,
    children=[
        models.ChildType(_EVENT_TYPE.tag, _EVENT_TYPE, multiple=True),
    ])

UKM_XML_TYPE = models.DocumentType(_UKM_CONFIGURATION_TYPE)


def PrettifyXmlAndTrimObsolete(original_xml: str) -> str:
  """Parses the original XML and returns a pretty-printed version.

  This also removes any events and metrics tagged as <obsolete>, and prints
  a warning if applicable.

  Args:
    original_xml: A string containing the original XML file contents.

  Returns:
    A pretty-printed XML string, or None if the config contains errors.
  """
  config = UKM_XML_TYPE.Parse(original_xml)
  event_tag = _EVENT_TYPE.tag
  metric_tag = _METRIC_TYPE.tag
  all_event_names = {event['name'] for event in config[event_tag]}
  config[event_tag] = list(filter(IsNotObsolete, config[event_tag]))
  non_obsolete_event_names = {event['name'] for event in config[event_tag]}
  if all_event_names != non_obsolete_event_names:
    print(f'Events {str(all_event_names - non_obsolete_event_names)} are'
          ' tagged as <obsolete>; deleting their definition as per'
          ' go/ukm-api#obsolete-events.\n')
  for event in config[event_tag]:
    all_metrics_names = {metric['name'] for metric in event[metric_tag]}
    event[metric_tag] = list(filter(IsNotObsolete, event[metric_tag]))
    non_obsolete_metrics_names = {
        metric['name']
        for metric in event[metric_tag]
    }
    if all_metrics_names != non_obsolete_metrics_names:
      event_name = event['name']
      print(f'Under the event {event_name!r}, metrics'
            f' {str(all_metrics_names - non_obsolete_metrics_names)} are'
            ' tagged as <obsolete>; deleting their definition as per'
            ' go/ukm-api#obsolete-events.\n')

  return UKM_XML_TYPE.PrettyPrint(config)


def IsNotObsolete(node):
  """Checks if the given node contains a child <obsolete> node."""
  return model_shared._OBSOLETE_TYPE.tag not in node