chromium/tools/perf/core/tbmv3/trace_processor_unittest.py

# Copyright 2019 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import os
import shutil
import tempfile
import unittest
from unittest import mock

from core.tbmv3 import trace_processor

RUN_METHOD = 'core.tbmv3.trace_processor._RunTraceProcessor'


class TraceProcessorTestCase(unittest.TestCase):

  def setUp(self):
    self.temp_dir = tempfile.mkdtemp()
    self.trace_path = self.CreateEmptyProtoTrace()
    self.tp_path = os.path.join(self.temp_dir, 'trace_processor_shell')
    with open(self.tp_path, 'w'):
      pass
    with open(os.path.join(self.temp_dir, 'dummy_metric.sql'), 'w'):
      pass
    with open(os.path.join(self.temp_dir, 'dummy_metric.proto'), 'w'):
      pass

  def tearDown(self):
    shutil.rmtree(self.temp_dir)

  def CreateEmptyProtoTrace(self):
    """Create an empty file as a proto trace."""
    with tempfile.NamedTemporaryFile(dir=self.temp_dir,
                                     delete=False) as trace_file:
      # Open temp file and close it so it's written to disk.
      pass
    return trace_file.name

  def testConvertProtoTraceToJson(self):
    with mock.patch(RUN_METHOD):
      trace_processor.ConvertProtoTraceToJson(self.tp_path, '/path/to/proto',
                                              '/path/to/json')

  def testRunMetricNoRepeated(self):
    metric_output = """
    {
      "perfetto.protos.dummy_metric": {
        "foo": 7,
        "bar": 8
       },
      "__annotations": {
        "perfetto.protos.dummy_metric": {
          "foo": {
            "__field_options": {
              "unit": "count_biggerIsBetter"
            }
          }
        }
      }
    }"""

    with mock.patch('core.tbmv3.trace_processor.METRICS_PATH', self.temp_dir):
      with mock.patch(RUN_METHOD) as run_patch:
        run_patch.return_value = metric_output
        histograms = trace_processor.RunMetric(self.tp_path, '/path/to/proto',
                                               'dummy_metric')

    foo_hist = histograms.GetHistogramNamed('dummy::foo')
    self.assertEqual(foo_hist.unit, 'count_biggerIsBetter')
    self.assertEqual(foo_hist.sample_values, [7])

    bar_hists = histograms.GetHistogramsNamed('dummy::bar')
    self.assertEqual(len(bar_hists), 0)

  def testRunMetricRepeated(self):
    metric_output = """
    {
      "perfetto.protos.dummy_metric": {
        "foo": [4, 5, 6],
        "bar": [{"baz": 10}, {"baz": 11}]
       },
      "__annotations": {
        "perfetto.protos.dummy_metric": {
          "foo": {
            "__repeated": true,
            "__field_options": {
              "unit": "count_biggerIsBetter"
            }
          },
          "bar": {
            "__repeated": true,
            "baz": {
              "__field_options": {
                "unit": "ms_smallerIsBetter"
              }
            }
          }
        }
      }
    }"""

    with mock.patch('core.tbmv3.trace_processor.METRICS_PATH', self.temp_dir):
      with mock.patch(RUN_METHOD) as run_patch:
        run_patch.return_value = metric_output
        histograms = trace_processor.RunMetric(self.tp_path, '/path/to/proto',
                                               'dummy_metric')

    foo_hist = histograms.GetHistogramNamed('dummy::foo')
    self.assertEqual(foo_hist.unit, 'count_biggerIsBetter')
    self.assertEqual(foo_hist.sample_values, [4, 5, 6])

    baz_hist = histograms.GetHistogramNamed('dummy::bar:baz')
    self.assertEqual(baz_hist.unit, 'ms_smallerIsBetter')
    self.assertEqual(baz_hist.sample_values, [10, 11])

  def testMarkedRepeatedButValueIsNotList(self):
    metric_output = """
    {
      "perfetto.protos.dummy_metric": {
        "foo": 4
       },
      "__annotations": {
        "perfetto.protos.dummy_metric": {
          "foo": {
            "__repeated": true,
            "__field_options": {
              "unit": "count_biggerIsBetter"
            }
          }
        }
      }
    }"""

    with mock.patch('core.tbmv3.trace_processor.METRICS_PATH', self.temp_dir):
      with mock.patch(RUN_METHOD) as run_patch:
        run_patch.return_value = metric_output
        with self.assertRaises(trace_processor.InvalidTraceProcessorOutput):
          trace_processor.RunMetric(self.tp_path, '/path/to/proto',
                                    'dummy_metric')

  def testMarkedNotRepeatedButValueIsList(self):
    metric_output = """
    {
      "perfetto.protos.dummy_metric": {
        "foo": [1, 2, 3]
       },
      "__annotations": {
        "perfetto.protos.dummy_metric": {
          "foo": {
            "__field_options": {
              "unit": "count_biggerIsBetter"
            }
          }
        }
      }
    }"""

    with mock.patch('core.tbmv3.trace_processor.METRICS_PATH', self.temp_dir):
      with mock.patch(RUN_METHOD) as run_patch:
        run_patch.return_value = metric_output
        with self.assertRaises(trace_processor.InvalidTraceProcessorOutput):
          trace_processor.RunMetric(self.tp_path, '/path/to/proto',
                                    'dummy_metric')

  def testRunMetricEmpty(self):
    metric_output = '{}'

    with mock.patch('core.tbmv3.trace_processor.METRICS_PATH', self.temp_dir):
      with mock.patch(RUN_METHOD) as run_patch:
        run_patch.return_value = metric_output
        # Checking that this doesn't throw errors.
        trace_processor.RunMetric(self.tp_path, '/path/to/proto',
                                  'dummy_metric')

  def testRunMetricNoAnnotations(self):
    metric_output = '{"perfetto.protos.foo": {"bar": 42}}'

    with mock.patch('core.tbmv3.trace_processor.METRICS_PATH', self.temp_dir):
      with mock.patch(RUN_METHOD) as run_patch:
        run_patch.return_value = metric_output
        # Checking that this doesn't throw errors.
        trace_processor.RunMetric(self.tp_path, '/path/to/proto',
                                  'dummy_metric')

  def testRunMetricFetchPowerProfile(self):
    FETCH_METHOD = (
        'core.tbmv3.trace_processor.binary_deps_manager.FetchDataFile')
    metric_output = '{}'

    with mock.patch(FETCH_METHOD) as fetch_patch:
      with mock.patch(RUN_METHOD) as run_patch:
        run_patch.return_value = metric_output
        # Checking that this doesn't throw errors.
        trace_processor.RunMetric(self.tp_path,
                                  '/path/to/proto',
                                  'dummy_metric',
                                  fetch_power_profile=True)

    self.assertEqual(fetch_patch.call_count, 1)
    self.assertIn('--pre-metrics', run_patch.call_args[0])

  def testRunQueryEmptyInput(self):
    sql_query = 'SELECT name, ts FROM slice ORDER BY ts LIMIT 1'
    sql_output = ''

    with mock.patch('core.tbmv3.trace_processor.METRICS_PATH', self.temp_dir):
      with mock.patch(RUN_METHOD) as run_patch:
        run_patch.return_value = sql_output
        query_output = trace_processor.RunQuery(self.tp_path, '/path/to/proto',
                                                sql_query)

    expected_output = []
    self.assertEqual(query_output, expected_output)

  def testRunQueryGarbageInput(self):
    sql_query = 'SELECT name, ts FROM slice ORDER BY ts LIMIT 1'
    sql_output = """adsfsdfdsaf"""

    with mock.patch('core.tbmv3.trace_processor.METRICS_PATH', self.temp_dir):
      with mock.patch(RUN_METHOD) as run_patch:
        run_patch.return_value = sql_output
        query_output = trace_processor.RunQuery(self.tp_path, '/path/to/proto',
                                                sql_query)

    expected_output = []
    self.assertEqual(query_output, expected_output)

  def testRunQueryNoRows(self):
    sql_query = 'SELECT name, ts FROM slice ORDER BY ts LIMIT 1'
    sql_output = """"ts","ts+dur"\n\n\n\n\n\n\n\n\n\n\n\n"""

    with mock.patch('core.tbmv3.trace_processor.METRICS_PATH', self.temp_dir):
      with mock.patch(RUN_METHOD) as run_patch:
        run_patch.return_value = sql_output
        query_output = trace_processor.RunQuery(self.tp_path, '/path/to/proto',
                                                sql_query)

    expected_output = []
    self.assertEqual(query_output, expected_output)

  def testRunQueryMultipleRows(self):
    sql_query = 'SELECT name, ts FROM slice ORDER BY ts LIMIT 3'
    sql_output = '"name","ts"\n' +\
                '"foo",123\n' +\
                '"bar",468\n' +\
                '"bas",432\n'

    with mock.patch('core.tbmv3.trace_processor.METRICS_PATH', self.temp_dir):
      with mock.patch(RUN_METHOD) as run_patch:
        run_patch.return_value = sql_output
        query_output = trace_processor.RunQuery(self.tp_path, '/path/to/proto',
                                                sql_query)

    expected_output = [{
        'name': 'foo',
        'ts': '123'
    }, {
        'name': 'bar',
        'ts': '468'
    }, {
        'name': 'bas',
        'ts': '432'
    }]
    self.assertEqual(query_output, expected_output)

  def testRunQueryDatatypes(self):
    sql_query = 'SELECT name,arg_set_id,ts/100.0,track_id=2 FROM slice LIMIT 3'
    sql_output = '"name","arg_set_id","ts/100.0","track_id=2"\n' +\
                '"bar",3,59.82,0\n' +\
                '"foo",2,37.14,1\n' +\
                '"bas",5,92.01,0\n'

    with mock.patch('core.tbmv3.trace_processor.METRICS_PATH', self.temp_dir):
      with mock.patch(RUN_METHOD) as run_patch:
        run_patch.return_value = sql_output
        query_output = trace_processor.RunQuery(self.tp_path, '/path/to/proto',
                                                sql_query)

    expected_output = [{
        'track_id=2': '0',
        'ts/100.0': '59.82',
        'name': 'bar',
        'arg_set_id': '3'
    }, {
        'track_id=2': '1',
        'ts/100.0': '37.14',
        'name': 'foo',
        'arg_set_id': '2'
    }, {
        'track_id=2': '0',
        'ts/100.0': '92.01',
        'name': 'bas',
        'arg_set_id': '5'
    }]
    self.assertEqual(query_output, expected_output)

  def testRunQueryDatatypeNull(self):
    sql_query = 'SELECT int_value, str_value FROM metadata LIMIT 2'
    sql_output = '"int_value","str_value"\n' +\
                '"[NULL]","Darwin"\n' +\
                '"[NULL]","Linux"\n'

    with mock.patch('core.tbmv3.trace_processor.METRICS_PATH', self.temp_dir):
      with mock.patch(RUN_METHOD) as run_patch:
        run_patch.return_value = sql_output
        query_output = trace_processor.RunQuery(self.tp_path, '/path/to/proto',
                                                sql_query)

    expected_output = [{
        'int_value': None,
        'str_value': 'Darwin'
    }, {
        'int_value': None,
        'str_value': 'Linux'
    }]
    self.assertEqual(query_output, expected_output)

  def testRunQueryNoPatch(self):
    sql_query = 'SELECT int_value, str_value FROM metadata LIMIT 1'
    try:
      trace_processor.RunQuery(None, self.trace_path, sql_query)
    except Exception as error:
      self.fail('Unexpected trace_processor error: {}'.format(error))

  def testWithInterferingEnvironmentVariables(self):
    os.environ['PERFETTO_SYMBOLIZER_MODE'] = 'placeholder'
    os.environ['PERFETTO_BINARY_PATH'] = 'placeholder'

    sql_query = 'SELECT int_value, str_value FROM metadata LIMIT 1'
    try:
      trace_processor.RunQuery(None, self.trace_path, sql_query)
    except Exception as error:
      self.fail('Unexpected trace_processor error: {}'.format(error))
    finally:
      os.environ.pop('PERFETTO_BINARY_PATH', None)
      os.environ.pop('PERFETTO_SYMBOLIZER_MODE', None)