# Copyright 2016 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
import copy
import json
import logging
import re
from test_result_util import ResultCollection, TestResult, TestStatus
LOGGER = logging.getLogger(__name__)
# These labels should match the ones output by gtest's JSON.
TEST_UNKNOWN_LABEL = 'UNKNOWN'
TEST_SUCCESS_LABEL = 'SUCCESS'
TEST_FAILURE_LABEL = 'FAILURE'
TEST_SKIPPED_LABEL = 'SKIPPED'
TEST_TIMEOUT_LABEL = 'TIMEOUT'
TEST_WARNING_LABEL = 'WARNING'
DID_NOT_COMPLETE = 'Did not complete.'
class GTestResult(object):
"""A result of gtest.
The class will be depreacated soon. Please use
|test_result_util.ResultCollection| instead. (crbug.com/1132476)
Properties:
command: The command argv.
crashed: Whether or not the test crashed.
crashed_test: The name of the test during which execution crashed, or
None if a particular test didn't crash.
failed_tests: A dict mapping the names of failed tests to a list of
lines of output from those tests.
flaked_tests: A dict mapping the names of failed flaky tests to a list
of lines of output from those tests.
passed_tests: A list of passed tests.
perf_links: A dict mapping the names of perf data points collected
to links to view those graphs.
return_code: The return code of the command.
success: Whether or not this run of the command was considered a
successful GTest execution.
"""
@property
def crashed(self):
return self._crashed
@property
def crashed_test(self):
return self._crashed_test
@property
def command(self):
return self._command
@property
def disabled_tests_from_compiled_tests_file(self):
if self.__finalized:
return copy.deepcopy(self._disabled_tests_from_compiled_tests_file)
return self._disabled_tests_from_compiled_tests_file
@property
def failed_tests(self):
if self.__finalized:
return copy.deepcopy(self._failed_tests)
return self._failed_tests
@property
def flaked_tests(self):
if self.__finalized:
return copy.deepcopy(self._flaked_tests)
return self._flaked_tests
@property
def passed_tests(self):
if self.__finalized:
return copy.deepcopy(self._passed_tests)
return self._passed_tests
@property
def perf_links(self):
if self.__finalized:
return copy.deepcopy(self._perf_links)
return self._perf_links
@property
def return_code(self):
return self._return_code
@property
def success(self):
return self._success
def __init__(self, command):
if not isinstance(command, collections.Iterable):
raise ValueError('Expected an iterable of command arguments.', command)
if not command:
raise ValueError('Expected a non-empty command.', command)
self._command = tuple(command)
self._crashed = False
self._crashed_test = None
self._disabled_tests_from_compiled_tests_file = []
self._failed_tests = collections.OrderedDict()
self._flaked_tests = collections.OrderedDict()
self._passed_tests = []
self._perf_links = collections.OrderedDict()
self._return_code = None
self._success = None
self.__finalized = False
def finalize(self, return_code, success):
self._return_code = return_code
self._success = success
# If the test was not considered to be a GTest success, but had no
# failing tests, conclude that it must have crashed.
if not self._success and not self._failed_tests and not self._flaked_tests:
self._crashed = True
# At most one test can crash the entire app in a given parsing.
for test, log_lines in self._failed_tests.items():
# A test with no output would have crashed. No output is replaced
# by the GTestLogParser by a sentence indicating non-completion.
if DID_NOT_COMPLETE in log_lines:
self._crashed = True
self._crashed_test = test
# A test marked as flaky may also have crashed the app.
for test, log_lines in self._flaked_tests.items():
if DID_NOT_COMPLETE in log_lines:
self._crashed = True
self._crashed_test = test
self.__finalized = True
class GTestLogParser(object):
"""This helper class process GTest test output."""
def __init__(self):
# Test results from the parser.
self._result_collection = ResultCollection()
# State tracking for log parsing
self.completed = False
self._current_test = ''
self._failure_description = []
self._parsing_failures = False
self._spawning_launcher = False
# Line number currently being processed.
self._line_number = 0
# List of parsing errors, as human-readable strings.
self._internal_error_lines = []
# Tests are stored here as 'test.name': (status, [description]).
# The status should be one of ('started', 'OK', 'failed', 'timeout',
# 'warning'). Warning indicates that a test did not pass when run in
# parallel with other tests but passed when run alone. The description is
# a list of lines detailing the test's error, as reported in the log.
self._test_status = {}
# This may be either text or a number. It will be used in the phrase
# '%s disabled' or '%s flaky' on the waterfall display.
self._disabled_tests = 0
# Disabled tests by parsing the compiled tests json file output from GTest.
self._disabled_tests_from_compiled_tests_file = []
self._flaky_tests = 0
# Regular expressions for parsing GTest logs. Test names look like
# "x.y", with 0 or more "w/" prefixes and 0 or more "/z" suffixes.
# e.g.:
# SomeName/SomeTestCase.SomeTest/1
# SomeName/SomeTestCase/1.SomeTest
# SomeName/SomeTestCase/1.SomeTest/SomeModifider
test_name_regexp = r'((\w+/)*\w+\.\w+(/\w+)*)'
self._master_name_re = re.compile(r'\[Running for master: "([^"]*)"')
self.master_name = ''
self._test_name = re.compile(test_name_regexp)
self._test_start = re.compile(r'\[\s+RUN\s+\] ' + test_name_regexp)
self._test_ok = re.compile(r'\[\s+OK\s+\] ' + test_name_regexp)
self._test_fail = re.compile(r'\[\s+FAILED\s+\] ' + test_name_regexp)
self._test_passed = re.compile(r'\[\s+PASSED\s+\] \d+ tests?.')
self._spawning_test_launcher = re.compile(r'Using \d+ parallel jobs?.')
self._test_run_spawning = re.compile(r'\[\d+\/\d+\] ' + test_name_regexp +
' \([0-9]+ ms\)')
self._test_passed_spawning = re.compile(r'Tests took \d+ seconds?.')
self._test_retry_spawning = re.compile(
r'Retrying \d+ test[s]? \(retry #\d+\)')
self._test_skipped = re.compile(r'\[\s+SKIPPED\s+\] ' + test_name_regexp)
self._run_test_cases_line = re.compile(
r'\[\s*\d+\/\d+\]\s+[0-9\.]+s ' + test_name_regexp + ' .+')
self._test_timeout = re.compile(
r'Test timeout \([0-9]+ ms\) exceeded for ' + test_name_regexp)
self._disabled = re.compile(r'\s*YOU HAVE (\d+) DISABLED TEST')
self._flaky = re.compile(r'\s*YOU HAVE (\d+) FLAKY TEST')
self._retry_message = re.compile('RETRYING FAILED TESTS:')
self.retrying_failed = False
self._compiled_tests_file_path_re = re.compile(
'.*Wrote compiled tests to file: (\S+)')
self.TEST_STATUS_MAP = {
'OK': TEST_SUCCESS_LABEL,
'failed': TEST_FAILURE_LABEL,
'skipped': TEST_SKIPPED_LABEL,
'timeout': TEST_TIMEOUT_LABEL,
'warning': TEST_WARNING_LABEL
}
self.compiled_tests_file_path = None
self._tests_loc_map = {}
def GetCurrentTest(self):
return self._current_test
def GetResultCollection(self):
return self._result_collection
def _StatusOfTest(self, test):
"""Returns the status code for the given test, or 'not known'."""
test_status = self._test_status.get(test, ('not known', []))
return test_status[0]
def _TestsByStatus(self, status, include_fails, include_flaky):
"""Returns list of tests with the given status.
Args:
include_fails: If False, tests containing 'FAILS_' anywhere in their
names will be excluded from the list.
include_flaky: If False, tests containing 'FLAKY_' anywhere in their
names will be excluded from the list.
"""
test_list = [x[0] for x in self._test_status.items()
if self._StatusOfTest(x[0]) == status]
if not include_fails:
test_list = [x for x in test_list if x.find('FAILS_') == -1]
if not include_flaky:
test_list = [x for x in test_list if x.find('FLAKY_') == -1]
return test_list
def _RecordError(self, line, reason):
"""Record a log line that produced a parsing error.
Args:
line: text of the line at which the error occurred
reason: a string describing the error
"""
self._internal_error_lines.append('%s: %s [%s]' %
(self._line_number, line.strip(), reason))
def RunningTests(self):
"""Returns list of tests that appear to be currently running."""
return self._TestsByStatus('started', True, True)
def ParsingErrors(self):
"""Returns a list of lines that have caused parsing errors."""
return self._internal_error_lines
def ClearParsingErrors(self):
"""Clears the currently stored parsing errors."""
self._internal_error_lines = ['Cleared.']
def PassedTests(self, include_fails=False, include_flaky=False):
"""Returns list of tests that passed."""
return self._TestsByStatus('OK', include_fails, include_flaky)
def FailedTests(self, include_fails=False, include_flaky=False):
"""Returns list of tests that failed, timed out, or didn't finish
(crashed).
This list will be incorrect until the complete log has been processed,
because it will show currently running tests as having failed.
Args:
include_fails: If true, all failing tests with FAILS_ in their names will
be included. Otherwise, they will only be included if they crashed or
timed out.
include_flaky: If true, all failing tests with FLAKY_ in their names will
be included. Otherwise, they will only be included if they crashed or
timed out.
"""
return (self._TestsByStatus('failed', include_fails, include_flaky) +
self._TestsByStatus('timeout', True, True) +
self._TestsByStatus('warning', include_fails, include_flaky) +
self.RunningTests())
def SkippedTests(self, include_fails=False, include_flaky=False):
"""Returns list of tests that were skipped"""
return self._TestsByStatus('skipped', include_fails, include_flaky)
def TriesForTest(self, test):
"""Returns a list containing the state for all tries of the given test.
This parser doesn't support retries so a single result is returned."""
return [self.TEST_STATUS_MAP.get(self._StatusOfTest(test),
TEST_UNKNOWN_LABEL)]
def DisabledTests(self):
"""Returns the name of the disabled test (if there is only 1) or the number
of disabled tests.
"""
return self._disabled_tests
def DisabledTestsFromCompiledTestsFile(self):
"""Returns the list of disabled tests in format '{TestCaseName}/{TestName}'.
Find all test names starting with DISABLED_ from the compiled test json
file if there is one. If there isn't or error in parsing, returns an
empty list.
"""
return self._disabled_tests_from_compiled_tests_file
def FlakyTests(self):
"""Returns the name of the flaky test (if there is only 1) or the number
of flaky tests.
"""
return self._flaky_tests
def FailureDescription(self, test):
"""Returns a list containing the failure description for the given test.
If the test didn't fail or timeout, returns [].
"""
test_status = self._test_status.get(test, ('', []))
return ['%s: ' % test] + test_status[1]
def CompletedWithoutFailure(self):
"""Returns True if all tests completed and no tests failed unexpectedly."""
return self.completed and not self.FailedTests()
def Finalize(self):
"""Finalize for |self._result_collection|.
Called at the end to add unfinished tests and crash status for
self._result_collection.
"""
# Remaining logs after crash before exit.
raw_remaining_logs = self._failure_description
for test in self.RunningTests():
self._test_status[test][1].extend(
['Potential test logs from crash until the end of test program:'])
self._test_status[test][1].extend(raw_remaining_logs)
self._result_collection.add_test_result(
TestResult(
test,
TestStatus.CRASH,
test_log='\n'.join(self._test_status[test][1])))
self._result_collection.crashed = True
if not self.completed:
self._result_collection.crashed = True
def _ParseDuration(self, line):
"""Returns test duration in milliseconds, None if not present."""
# Test duration appears as suffix of status line like:
# "[ OK ] SomeTest.SomeTestName (539 ms)"
test_duration_regex = re.compile(r'\s+\(([0-9]+)\s+ms\)')
results = test_duration_regex.search(line)
if results:
return int(results.group(1))
return None
def ProcessLine(self, line):
"""This is called once with each line of the test log."""
# Track line number for error messages.
self._line_number += 1
# Some tests (net_unittests in particular) run subprocesses which can write
# stuff to shared stdout buffer. Sometimes such output appears between new
# line and gtest directives ('[ RUN ]', etc) which breaks the parser.
# Code below tries to detect such cases and recognize a mixed line as two
# separate lines.
# List of regexps that parses expects to find at the start of a line but
# which can be somewhere in the middle.
gtest_regexps = [
self._test_start,
self._test_ok,
self._test_fail,
self._test_passed,
self._test_skipped,
self._spawning_test_launcher,
self._test_run_spawning,
self._test_passed_spawning,
self._test_retry_spawning,
]
for regexp in gtest_regexps:
match = regexp.search(line)
if match:
break
if not match or match.start() == 0:
self._ProcessLine(line)
else:
self._ProcessLine(line[:match.start()])
self._ProcessLine(line[match.start():])
def _ProcessLine(self, line):
"""Parses the line and changes the state of parsed tests accordingly.
Will recognize newly started tests, OK or FAILED statuses, timeouts, etc.
"""
# Note: When sharding, the number of disabled and flaky tests will be read
# multiple times, so this will only show the most recent values (but they
# should all be the same anyway).
# Is it a line listing the master name?
if not self.master_name:
results = self._master_name_re.match(line)
if results:
self.master_name = results.group(1)
results = self._run_test_cases_line.match(line)
if results:
# A run_test_cases.py output.
if self._current_test:
if self._test_status[self._current_test][0] == 'started':
self._test_status[self._current_test] = (
'timeout', self._failure_description)
self._result_collection.add_test_result(
TestResult(
self._current_test,
TestStatus.ABORT,
test_log='\n'.join(self._failure_description)))
self._current_test = ''
self._failure_description = []
return
# Is it a line declaring spawning test runner? If so then
# we really don't need to parse any of the test cases since 1) they are
# in a different format 2) retries happen inside the test launcher itself.
results = self._spawning_test_launcher.match(line)
if results:
self._spawning_launcher = True
self._result_collection.spawning_test_launcher = True
return
# With the spawning test launcher the log format is slightly different.
# On failures the original gtest log format will be spit out to stdout
# from the subprocess and the regular log parsing will be used. If all
# tests succeeded in the subprocess only completion lines will be written
# out.
if self._spawning_launcher:
results = self._test_passed_spawning.match(line)
if results:
self.completed = True
self._current_test = ''
return
results = self._test_run_spawning.match(line)
if results:
test_name = results.group(1)
duration = self._ParseDuration(line)
status = self._StatusOfTest(test_name)
# If we encountered this line and it is not known then we know it
# passed.
if status in ('started', 'not known'):
self._test_status[test_name] = ('OK', [])
self._result_collection.add_test_result(
TestResult(test_name, TestStatus.PASS, duration=duration))
self._failure_description = []
self._current_test = ''
return
results = self._test_retry_spawning.match(line)
# We are retrying failed tests so mark them all as started again
# so that we will be in the correct state when we either see
# a failure or a completion result.
if results:
for test in self.FailedTests():
self._test_status[test] = ('started', [DID_NOT_COMPLETE])
self.retrying_failed = True
# Is it a line declaring all tests passed?
results = self._test_passed.match(line)
if results:
self.completed = True
self._current_test = ''
return
# Is it a line reporting disabled tests?
results = self._disabled.match(line)
if results:
try:
disabled = int(results.group(1))
except ValueError:
disabled = 0
if disabled > 0 and isinstance(self._disabled_tests, int):
self._disabled_tests = disabled
else:
# If we can't parse the line, at least give a heads-up. This is a
# safety net for a case that shouldn't happen but isn't a fatal error.
self._disabled_tests = 'some'
return
# Is it a line reporting flaky tests?
results = self._flaky.match(line)
if results:
try:
flaky = int(results.group(1))
except ValueError:
flaky = 0
if flaky > 0 and isinstance(self._flaky_tests, int):
self._flaky_tests = flaky
else:
# If we can't parse the line, at least give a heads-up. This is a
# safety net for a case that shouldn't happen but isn't a fatal error.
self._flaky_tests = 'some'
return
# Is it the start of a test?
results = self._test_start.match(line)
if results:
if self._current_test:
if self._test_status[self._current_test][0] == 'started':
self._test_status[self._current_test] = (
'timeout', self._failure_description)
self._result_collection.add_test_result(
TestResult(
self._current_test,
TestStatus.ABORT,
test_log='\n'.join(self._failure_description)))
test_name = results.group(1)
self._test_status[test_name] = ('started', [DID_NOT_COMPLETE])
self._current_test = test_name
if self.retrying_failed:
self._failure_description = self._test_status[test_name][1]
self._failure_description.extend(['', 'RETRY OUTPUT:', ''])
else:
self._failure_description = []
return
# Is it a test success line?
results = self._test_ok.match(line)
if results:
test_name = results.group(1)
status = self._StatusOfTest(test_name)
duration = self._ParseDuration(line)
if status != 'started':
self._RecordError(line, 'success while in status %s' % status)
if self.retrying_failed:
self._test_status[test_name] = ('warning', self._failure_description)
# This is a passed result. Previous failures were reported in separate
# TestResult objects.
self._result_collection.add_test_result(
TestResult(
test_name,
TestStatus.PASS,
duration=duration,
test_log='\n'.join(self._failure_description)))
else:
self._test_status[test_name] = ('OK', [])
self._result_collection.add_test_result(
TestResult(test_name, TestStatus.PASS, duration=duration))
self._failure_description = []
self._current_test = ''
return
# Is it a test skipped line?
results = self._test_skipped.match(line)
if results:
test_name = results.group(1)
status = self._StatusOfTest(test_name)
# Skipped tests are listed again in the summary.
if status not in ('started', 'skipped'):
self._RecordError(line, 'skipped while in status %s' % status)
self._test_status[test_name] = ('skipped', [])
self._result_collection.add_test_result(
TestResult(
test_name,
TestStatus.SKIP,
expected_status=TestStatus.SKIP,
test_log='Test skipped when running suite.'))
self._failure_description = []
self._current_test = ''
return
# Is it a test failure line?
results = self._test_fail.match(line)
if results:
test_name = results.group(1)
status = self._StatusOfTest(test_name)
duration = self._ParseDuration(line)
if status not in ('started', 'failed', 'timeout'):
self._RecordError(line, 'failure while in status %s' % status)
if self._current_test != test_name:
if self._current_test:
self._RecordError(
line,
'%s failure while in test %s' % (test_name, self._current_test))
return
# Don't overwrite the failure description when a failing test is listed a
# second time in the summary, or if it was already recorded as timing
# out.
if status not in ('failed', 'timeout'):
self._test_status[test_name] = ('failed', self._failure_description)
# Add to |test_results| regardless whether the test ran before.
self._result_collection.add_test_result(
TestResult(
test_name,
TestStatus.FAIL,
duration=duration,
test_log='\n'.join(self._failure_description)))
self._failure_description = []
self._current_test = ''
return
# Is it a test timeout line?
results = self._test_timeout.search(line)
if results:
test_name = results.group(1)
status = self._StatusOfTest(test_name)
if status not in ('started', 'failed'):
self._RecordError(line, 'timeout while in status %s' % status)
logs = self._failure_description + ['Killed (timed out).']
self._test_status[test_name] = ('timeout', logs)
self._result_collection.add_test_result(
TestResult(
test_name,
TestStatus.ABORT,
test_log='\n'.join(logs),
))
self._failure_description = []
self._current_test = ''
return
# Is it the start of the retry tests?
results = self._retry_message.match(line)
if results:
self.retrying_failed = True
return
# Is it the line containing path to the compiled tests json file?
results = self._compiled_tests_file_path_re.match(line)
if results:
self.compiled_tests_file_path = results.group(1)
LOGGER.info('Compiled tests json file path: %s' %
self.compiled_tests_file_path)
return
# Random line: if we're in a test, collect it for the failure description.
# Tests may run simultaneously, so this might be off, but it's worth a try.
# This also won't work if a test times out before it begins running.
if self._current_test:
self._failure_description.append(line)
# Parse the "Failing tests:" list at the end of the output, and add any
# additional failed tests to the list. For example, this includes tests
# that crash after the OK line.
if self._parsing_failures:
results = self._test_name.match(line)
if results:
test_name = results.group(1)
status = self._StatusOfTest(test_name)
if status in ('not known', 'OK'):
unknown_error_log = 'Unknown error, see stdio log.'
self._test_status[test_name] = ('failed', [unknown_error_log])
self._result_collection.add_test_result(
TestResult(
test_name, TestStatus.FAIL, test_log=unknown_error_log))
else:
self._parsing_failures = False
elif line.startswith('Failing tests:'):
self._parsing_failures = True
# host_test_file_path is compiled_tests_file_path by default on simulators.
# However, for device tests,
# it needs to be overridden with a path that exists on the host because
# compiled_tests_file_path is the path on the device in this case
def ParseAndPopulateTestResultLocations(self,
test_repo,
output_disabled_tests,
host_test_file_path=None):
try:
# TODO(crbug.com/40134137): Read the file when running on device.
# Parse compiled test file first. If output_disabled_tests is true,
# then include disabled tests in the test results.
if host_test_file_path is None:
host_test_file_path = self.compiled_tests_file_path
if host_test_file_path is None:
return
with open(host_test_file_path) as f:
disabled_tests_from_json = []
compiled_tests = json.load(f)
for single_test in compiled_tests:
test_case_name = single_test.get('test_case_name')
test_name = single_test.get('test_name')
test_file = single_test.get('file')
test_file = test_file.replace('../../', '//')
full_test_name = str('%s.%s' % (test_case_name, test_name))
self._tests_loc_map[full_test_name] = test_file
if test_case_name and test_name and test_name.startswith('DISABLED_'):
if output_disabled_tests:
test_loc = {'repo': test_repo, 'fileName': test_file}
disabled_tests_from_json.append(full_test_name)
self._result_collection.add_test_result(
TestResult(
full_test_name,
TestStatus.SKIP,
expected_status=TestStatus.SKIP,
test_log='Test disabled.',
test_loc=test_loc))
self._disabled_tests_from_compiled_tests_file = (
disabled_tests_from_json)
# Populate location info for test results in result collections
for test_result in self._result_collection.test_results:
test_file = self._tests_loc_map.get(test_result.name, None)
if test_file:
test_loc = {'repo': test_repo, 'fileName': test_file}
test_result.test_loc = test_loc
except Exception as e:
LOGGER.warning(
'Error when finding disabled tests in compiled tests json file: %s' %
e)
return