# Copyright 2020 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Methods related to outputting script results in a human-readable format.
Also probably a good example of how to *not* write HTML.
"""
import collections
import logging
import sys
import tempfile
from typing import Any, Dict, IO, List, Optional, Set, Union
import six
from unexpected_passes_common import data_types
# Used for posting Buganizer comments.
from blinkpy.w3c import buganizer
FULL_PASS = 'Fully passed in the following'
PARTIAL_PASS = 'Partially passed in the following'
NEVER_PASS = 'Never passed in the following'
HTML_HEADER = """\
<!DOCTYPE html>
<html>
<head>
<meta content="width=device-width">
<style>
.collapsible_group {
background-color: #757575;
border: none;
color: white;
font-size:20px;
outline: none;
text-align: left;
width: 100%;
}
.active_collapsible_group, .collapsible_group:hover {
background-color: #474747;
}
.highlighted_collapsible_group {
background-color: #008000;
border: none;
color: white;
font-size:20px;
outline: none;
text-align: left;
width: 100%;
}
.active_highlighted_collapsible_group, .highlighted_collapsible_group:hover {
background-color: #004d00;
}
.content {
background-color: #e1e4e8;
display: none;
padding: 0 25px;
}
button {
user-select: text;
}
h1 {
background-color: black;
color: white;
}
</style>
</head>
<body>
"""
HTML_FOOTER = """\
<script>
function OnClickImpl(element) {
let sibling = element.nextElementSibling;
if (sibling.style.display === "block") {
sibling.style.display = "none";
} else {
sibling.style.display = "block";
}
}
function OnClick() {
this.classList.toggle("active_collapsible_group");
OnClickImpl(this);
}
function OnClickHighlighted() {
this.classList.toggle("active_highlighted_collapsible_group");
OnClickImpl(this);
}
// Repeatedly bubble up the highlighted_collapsible_group class as long as all
// siblings are highlighted.
let found_element_to_convert = false;
do {
found_element_to_convert = false;
// Get an initial list of all highlighted_collapsible_groups.
let highlighted_collapsible_groups = document.getElementsByClassName(
"highlighted_collapsible_group");
let highlighted_list = [];
for (elem of highlighted_collapsible_groups) {
highlighted_list.push(elem);
}
// Bubble up the highlighted_collapsible_group class.
while (highlighted_list.length) {
elem = highlighted_list.shift();
if (elem.tagName == 'BODY') {
continue;
}
if (elem.classList.contains("content")) {
highlighted_list.push(elem.previousElementSibling);
continue;
}
if (elem.classList.contains("collapsible_group")) {
found_element_to_convert = true;
elem.classList.add("highlighted_collapsible_group");
elem.classList.remove("collapsible_group");
}
sibling_elements = elem.parentElement.children;
let found_non_highlighted_group = false;
for (e of sibling_elements) {
if (e.classList.contains("collapsible_group")) {
found_non_highlighted_group = true;
break
}
}
if (!found_non_highlighted_group) {
highlighted_list.push(elem.parentElement);
}
}
} while (found_element_to_convert);
// Apply OnClick listeners so [highlighted_]collapsible_groups properly
// shrink/expand.
let collapsible_groups = document.getElementsByClassName("collapsible_group");
for (element of collapsible_groups) {
element.addEventListener("click", OnClick);
}
highlighted_collapsible_groups = document.getElementsByClassName(
"highlighted_collapsible_group");
for (element of highlighted_collapsible_groups) {
element.addEventListener("click", OnClickHighlighted);
}
</script>
</body>
</html>
"""
SECTION_STALE = 'Stale Expectations (Passed 100% Everywhere, Can Remove)'
SECTION_SEMI_STALE = ('Semi Stale Expectations (Passed 100% In Some Places, '
'But Not Everywhere - Can Likely Be Modified But Not '
'Necessarily Removed)')
SECTION_ACTIVE = ('Active Expectations (Failed At Least Once Everywhere, '
'Likely Should Be Left Alone)')
SECTION_UNMATCHED = ('Unmatched Results (An Expectation Existed When The Test '
'Ran, But No Matching One Currently Exists OR The '
'Expectation Is Too New)')
SECTION_UNUSED = ('Unused Expectations (Indicative Of The Configuration No '
'Longer Being Tested Or Tags Changing)')
MAX_BUGS_PER_LINE = 5
MAX_CHARACTERS_PER_CL_LINE = 72
BUGANIZER_COMMENT = ('The unexpected pass finder removed the last expectation '
'associated with this bug. An associated CL should be '
'landing shortly, after which this bug can be closed once '
'a human confirms there is no more work to be done.')
ElementType = Union[Dict[str, Any], List[str], str]
# Sample:
# {
# expectation_file: {
# test_name: {
# expectation_summary: {
# builder_name: {
# 'Fully passed in the following': [
# step1,
# ],
# 'Partially passed in the following': {
# step2: [
# failure_link,
# ],
# },
# 'Never passed in the following': [
# step3,
# ],
# }
# }
# }
# }
# }
FullOrNeverPassValue = List[str]
PartialPassValue = Dict[str, List[str]]
PassValue = Union[FullOrNeverPassValue, PartialPassValue]
BuilderToPassMap = Dict[str, Dict[str, PassValue]]
ExpectationToBuilderMap = Dict[str, BuilderToPassMap]
TestToExpectationMap = Dict[str, ExpectationToBuilderMap]
ExpectationFileStringDict = Dict[str, TestToExpectationMap]
# Sample:
# {
# test_name: {
# builder_name: {
# step_name: [
# individual_result_string_1,
# individual_result_string_2,
# ...
# ],
# ...
# },
# ...
# },
# ...
# }
StepToResultsMap = Dict[str, List[str]]
BuilderToStepMap = Dict[str, StepToResultsMap]
TestToBuilderStringDict = Dict[str, BuilderToStepMap]
# Sample:
# {
# result_output.FULL_PASS: {
# builder_name: [
# step_name (total passes / total builds)
# ],
# },
# result_output.NEVER_PASS: {
# builder_name: [
# step_name (total passes / total builds)
# ],
# },
# result_output.PARTIAL_PASS: {
# builder_name: {
# step_name (total passes / total builds): [
# failure links,
# ],
# },
# },
# }
FullOrNeverPassStepValue = List[str]
PartialPassStepValue = Dict[str, List[str]]
PassStepValue = Union[FullOrNeverPassStepValue, PartialPassStepValue]
UnmatchedResultsType = Dict[str, data_types.ResultListType]
UnusedExpectation = Dict[str, List[data_types.Expectation]]
RemovedUrlsType = Union[List[str], Set[str]]
def OutputResults(stale_dict: data_types.TestExpectationMap,
semi_stale_dict: data_types.TestExpectationMap,
active_dict: data_types.TestExpectationMap,
unmatched_results: UnmatchedResultsType,
unused_expectations: UnusedExpectation,
output_format: str,
file_handle: Optional[IO] = None) -> None:
"""Outputs script results to |file_handle|.
Args:
stale_dict: A data_types.TestExpectationMap containing all the stale
expectations.
semi_stale_dict: A data_types.TestExpectationMap containing all the
semi-stale expectations.
active_dict: A data_types.TestExpectationmap containing all the active
expectations.
ummatched_results: Any unmatched results found while filling
|test_expectation_map|, as returned by
queries.FillExpectationMapFor[Ci|Try]Builders().
unused_expectations: A dict from expectation file (str) to list of
unmatched Expectations that were pulled out of |test_expectation_map|
output_format: A string denoting the format to output to. Valid values are
"print" and "html".
file_handle: An optional open file-like object to output to. If not
specified, a suitable default will be used.
"""
assert isinstance(stale_dict, data_types.TestExpectationMap)
assert isinstance(semi_stale_dict, data_types.TestExpectationMap)
assert isinstance(active_dict, data_types.TestExpectationMap)
logging.info('Outputting results in format %s', output_format)
stale_str_dict = _ConvertTestExpectationMapToStringDict(stale_dict)
semi_stale_str_dict = _ConvertTestExpectationMapToStringDict(semi_stale_dict)
active_str_dict = _ConvertTestExpectationMapToStringDict(active_dict)
unmatched_results_str_dict = _ConvertUnmatchedResultsToStringDict(
unmatched_results)
unused_expectations_str_list = _ConvertUnusedExpectationsToStringDict(
unused_expectations)
if output_format == 'print':
file_handle = file_handle or sys.stdout
if stale_dict:
file_handle.write(SECTION_STALE + '\n')
RecursivePrintToFile(stale_str_dict, 0, file_handle)
if semi_stale_dict:
file_handle.write(SECTION_SEMI_STALE + '\n')
RecursivePrintToFile(semi_stale_str_dict, 0, file_handle)
if active_dict:
file_handle.write(SECTION_ACTIVE + '\n')
RecursivePrintToFile(active_str_dict, 0, file_handle)
if unused_expectations_str_list:
file_handle.write('\n' + SECTION_UNUSED + '\n')
RecursivePrintToFile(unused_expectations_str_list, 0, file_handle)
if unmatched_results_str_dict:
file_handle.write('\n' + SECTION_UNMATCHED + '\n')
RecursivePrintToFile(unmatched_results_str_dict, 0, file_handle)
elif output_format == 'html':
should_close_file = False
if not file_handle:
should_close_file = True
file_handle = tempfile.NamedTemporaryFile(delete=False,
suffix='.html',
mode='w')
file_handle.write(HTML_HEADER)
if stale_dict:
file_handle.write('<h1>' + SECTION_STALE + '</h1>\n')
_RecursiveHtmlToFile(stale_str_dict, file_handle)
if semi_stale_dict:
file_handle.write('<h1>' + SECTION_SEMI_STALE + '</h1>\n')
_RecursiveHtmlToFile(semi_stale_str_dict, file_handle)
if active_dict:
file_handle.write('<h1>' + SECTION_ACTIVE + '</h1>\n')
_RecursiveHtmlToFile(active_str_dict, file_handle)
if unused_expectations_str_list:
file_handle.write('\n<h1>' + SECTION_UNUSED + '</h1>\n')
_RecursiveHtmlToFile(unused_expectations_str_list, file_handle)
if unmatched_results_str_dict:
file_handle.write('\n<h1>' + SECTION_UNMATCHED + '</h1>\n')
_RecursiveHtmlToFile(unmatched_results_str_dict, file_handle)
file_handle.write(HTML_FOOTER)
if should_close_file:
file_handle.close()
print('Results available at file://%s' % file_handle.name)
else:
raise RuntimeError('Unsupported output format %s' % output_format)
def RecursivePrintToFile(element: ElementType, depth: int,
file_handle: IO) -> None:
"""Recursively prints |element| as text to |file_handle|.
Args:
element: A dict, list, or str/unicode to output.
depth: The current depth of the recursion as an int.
file_handle: An open file-like object to output to.
"""
if element is None:
element = str(element)
if isinstance(element, six.string_types):
file_handle.write((' ' * depth) + element + '\n')
elif isinstance(element, dict):
for k, v in element.items():
RecursivePrintToFile(k, depth, file_handle)
RecursivePrintToFile(v, depth + 1, file_handle)
elif isinstance(element, list):
for i in element:
RecursivePrintToFile(i, depth, file_handle)
else:
raise RuntimeError('Given unhandled type %s' % type(element))
def _RecursiveHtmlToFile(element: ElementType, file_handle: IO) -> None:
"""Recursively outputs |element| as HTMl to |file_handle|.
Iterables will be output as a collapsible section containing any of the
iterable's contents.
Any link-like text will be turned into anchor tags.
Args:
element: A dict, list, or str/unicode to output.
file_handle: An open file-like object to output to.
"""
if isinstance(element, six.string_types):
file_handle.write('<p>%s</p>\n' % _LinkifyString(element))
elif isinstance(element, dict):
for k, v in element.items():
html_class = 'collapsible_group'
# This allows us to later (in JavaScript) recursively highlight sections
# that are likely of interest to the user, i.e. whose expectations can be
# modified.
if k and FULL_PASS in k:
html_class = 'highlighted_collapsible_group'
file_handle.write('<button type="button" class="%s">%s</button>\n' %
(html_class, k))
file_handle.write('<div class="content">\n')
_RecursiveHtmlToFile(v, file_handle)
file_handle.write('</div>\n')
elif isinstance(element, list):
for i in element:
_RecursiveHtmlToFile(i, file_handle)
else:
raise RuntimeError('Given unhandled type %s' % type(element))
def _LinkifyString(s: str) -> str:
"""Turns instances of links into anchor tags.
Args:
s: The string to linkify.
Returns:
A copy of |s| with instances of links turned into anchor tags pointing to
the link.
"""
for component in s.split():
if component.startswith('http'):
component = component.strip(',.!')
s = s.replace(component, '<a href="%s">%s</a>' % (component, component))
return s
def _ConvertTestExpectationMapToStringDict(
test_expectation_map: data_types.TestExpectationMap
) -> ExpectationFileStringDict:
"""Converts |test_expectation_map| to a dict of strings for reporting.
Args:
test_expectation_map: A data_types.TestExpectationMap.
Returns:
A string dictionary representation of |test_expectation_map| in the
following format:
{
expectation_file: {
test_name: {
expectation_summary: {
builder_name: {
'Fully passed in the following': [
step1,
],
'Partially passed in the following': {
step2: [
failure_link,
],
},
'Never passed in the following': [
step3,
],
}
}
}
}
}
"""
assert isinstance(test_expectation_map, data_types.TestExpectationMap)
output_dict = {}
# This initially looks like a good target for using
# data_types.TestExpectationMap's iterators since there are many nested loops.
# However, we need to reset state in different loops, and the alternative of
# keeping all the state outside the loop and resetting under certain
# conditions ends up being less readable than just using nested loops.
for expectation_file, expectation_map in test_expectation_map.items():
output_dict[expectation_file] = {}
for expectation, builder_map in expectation_map.items():
test_name = expectation.test
expectation_str = _FormatExpectation(expectation)
output_dict[expectation_file].setdefault(test_name, {})
output_dict[expectation_file][test_name][expectation_str] = {}
for builder_name, step_map in builder_map.items():
output_dict[expectation_file][test_name][expectation_str][
builder_name] = {}
fully_passed = []
partially_passed = {}
never_passed = []
for step_name, stats in step_map.items():
if stats.NeverNeededExpectation(expectation):
fully_passed.append(AddStatsToStr(step_name, stats))
elif stats.AlwaysNeededExpectation(expectation):
never_passed.append(AddStatsToStr(step_name, stats))
else:
assert step_name not in partially_passed
partially_passed[step_name] = stats
output_builder_map = output_dict[expectation_file][test_name][
expectation_str][builder_name]
if fully_passed:
output_builder_map[FULL_PASS] = fully_passed
if partially_passed:
output_builder_map[PARTIAL_PASS] = {}
for step_name, stats in partially_passed.items():
s = AddStatsToStr(step_name, stats)
output_builder_map[PARTIAL_PASS][s] = list(stats.failure_links)
if never_passed:
output_builder_map[NEVER_PASS] = never_passed
return output_dict
def _ConvertUnmatchedResultsToStringDict(unmatched_results: UnmatchedResultsType
) -> TestToBuilderStringDict:
"""Converts |unmatched_results| to a dict of strings for reporting.
Args:
unmatched_results: A dict mapping builder names (string) to lists of
data_types.Result who did not have a matching expectation.
Returns:
A string dictionary representation of |unmatched_results| in the following
format:
{
test_name: {
builder_name: {
step_name: [
individual_result_string_1,
individual_result_string_2,
...
],
...
},
...
},
...
}
"""
output_dict = {}
for builder, results in unmatched_results.items():
for r in results:
builder_map = output_dict.setdefault(r.test, {})
step_map = builder_map.setdefault(builder, {})
result_str = 'Got "%s" on %s with tags [%s]' % (
r.actual_result, data_types.BuildLinkFromBuildId(
r.build_id), ' '.join(r.tags))
step_map.setdefault(r.step, []).append(result_str)
return output_dict
def _ConvertUnusedExpectationsToStringDict(
unused_expectations: UnusedExpectation) -> Dict[str, List[str]]:
"""Converts |unused_expectations| to a dict of strings for reporting.
Args:
unused_expectations: A dict mapping expectation file (str) to lists of
data_types.Expectation who did not have any matching results.
Returns:
A string dictionary representation of |unused_expectations| in the following
format:
{
expectation_file: [
expectation1,
expectation2,
],
}
The expectations are in a format similar to what would be present as a line
in an expectation file.
"""
output_dict = {}
for expectation_file, expectations in unused_expectations.items():
expectation_str_list = []
for e in expectations:
expectation_str_list.append(e.AsExpectationFileString())
output_dict[expectation_file] = expectation_str_list
return output_dict
def _FormatExpectation(expectation: data_types.Expectation) -> str:
return '"%s" expectation on "%s"' % (' '.join(
expectation.expected_results), ' '.join(expectation.tags))
def AddStatsToStr(s: str, stats: data_types.BuildStats) -> str:
return '%s %s' % (s, stats.GetStatsAsString())
def OutputAffectedUrls(removed_urls: RemovedUrlsType,
orphaned_urls: Optional[RemovedUrlsType] = None,
bug_file_handle: Optional[IO] = None,
auto_close_bugs: bool = True) -> None:
"""Outputs URLs of affected expectations for easier consumption by the user.
Outputs the following:
1. A string suitable for passing to Chrome via the command line to
open all bugs in the browser.
2. A string suitable for copying into the CL description to associate the CL
with all the affected bugs.
3. A string containing any bugs that should be closable since there are no
longer any associated expectations.
Args:
removed_urls: A set or list of strings containing bug URLs.
orphaned_urls: A subset of |removed_urls| whose bugs no longer have any
corresponding expectations.
bug_file_handle: An optional open file-like object to write CL description
bug information to. If not specified, will print to the terminal.
auto_close_bugs: A boolean specifying whether bugs in |orphaned_urls| should
be auto-closed on CL submission or not. If not closed, a comment will
be posted instead.
"""
removed_urls = list(removed_urls)
removed_urls.sort()
orphaned_urls = orphaned_urls or []
orphaned_urls = list(orphaned_urls)
orphaned_urls.sort()
_OutputAffectedUrls(removed_urls, orphaned_urls)
_OutputUrlsForClDescription(removed_urls,
orphaned_urls,
file_handle=bug_file_handle,
auto_close_bugs=auto_close_bugs)
def _OutputAffectedUrls(affected_urls: List[str],
orphaned_urls: List[str],
file_handle: Optional[IO] = None) -> None:
"""Outputs |urls| for opening in a browser as affected bugs.
Args:
affected_urls: A list of strings containing URLs to output.
orphaned_urls: A list of strings containing URLs to output as closable.
file_handle: A file handle to write the string to. Defaults to stdout.
"""
_OutputUrlsForCommandLine(affected_urls, 'Affected bugs', file_handle)
if orphaned_urls:
_OutputUrlsForCommandLine(orphaned_urls, 'Closable bugs', file_handle)
def _OutputUrlsForCommandLine(urls: List[str],
description: str,
file_handle: Optional[IO] = None) -> None:
"""Outputs |urls| for opening in a browser.
The output string is meant to be passed to a browser via the command line in
order to open all URLs in that browser, e.g.
`google-chrome https://crbug.com/1234 https://crbug.com/2345`
Args:
urls: A list of strings containing URLs to output.
description: A description of the URLs to be output.
file_handle: A file handle to write the string to. Defaults to stdout.
"""
file_handle = file_handle or sys.stdout
def _StartsWithHttp(url: str) -> bool:
return url.startswith('https://') or url.startswith('http://')
urls = [u if _StartsWithHttp(u) else 'https://%s' % u for u in urls]
file_handle.write('%s: %s\n' % (description, ' '.join(urls)))
def _OutputUrlsForClDescription(affected_urls: List[str],
orphaned_urls: List[str],
file_handle: Optional[IO] = None,
auto_close_bugs: bool = True) -> None:
"""Outputs |urls| for use in a CL description.
Output adheres to the line length recommendation and max number of bugs per
line supported in Gerrit.
Args:
affected_urls: A list of strings containing URLs to output.
orphaned_urls: A list of strings containing URLs to output as closable.
file_handle: A file handle to write the string to. Defaults to stdout.
auto_close_bugs: A boolean specifying whether bugs in |orphaned_urls| should
be auto-closed on CL submission or not. If not closed, a comment will
be posted instead.
"""
def AddBugTypeToOutputString(urls, prefix):
output_str = ''
current_line = ''
bugs_on_line = 0
urls = collections.deque(urls)
while len(urls):
current_bug = urls.popleft()
current_bug = current_bug.split('crbug.com/', 1)[1]
# Handles cases like crbug.com/angleproject/1234.
current_bug = current_bug.replace('/', ':')
# First bug on the line.
if not current_line:
current_line = '%s %s' % (prefix, current_bug)
# Bug or length limit hit for line.
elif (
len(current_line) + len(current_bug) + 2 > MAX_CHARACTERS_PER_CL_LINE
or bugs_on_line >= MAX_BUGS_PER_LINE):
output_str += current_line + '\n'
bugs_on_line = 0
current_line = '%s %s' % (prefix, current_bug)
# Can add to current line.
else:
current_line += ', %s' % current_bug
bugs_on_line += 1
output_str += current_line + '\n'
return output_str
file_handle = file_handle or sys.stdout
affected_but_not_closable = set(affected_urls) - set(orphaned_urls)
affected_but_not_closable = list(affected_but_not_closable)
affected_but_not_closable.sort()
output_str = ''
if affected_but_not_closable:
output_str += AddBugTypeToOutputString(affected_but_not_closable, 'Bug:')
if orphaned_urls:
if auto_close_bugs:
output_str += AddBugTypeToOutputString(orphaned_urls, 'Fixed:')
else:
output_str += AddBugTypeToOutputString(orphaned_urls, 'Bug:')
_PostCommentsToOrphanedBugs(orphaned_urls)
file_handle.write('Affected bugs for CL description:\n%s' % output_str)
def _PostCommentsToOrphanedBugs(orphaned_urls: List[str]) -> None:
"""Posts comments to bugs in |orphaned_urls| saying they can likely be closed.
Does not post again if the comment has been posted before in the past.
Args:
orphaned_urls: A list of strings containing URLs to post comments to.
"""
try:
buganizer_client = _GetBuganizerClient()
except buganizer.BuganizerError as e:
logging.error(
'Encountered error when authenticating, cannot post comments. %s', e)
return
for url in orphaned_urls:
try:
comment_list = buganizer_client.GetIssueComments(url)
# GetIssueComments currently returns a dict if something goes wrong
# instead of raising an exception.
if isinstance(comment_list, dict):
logging.exception('Failed to get comments from %s: %s', url,
comment_list.get('error', 'error not provided'))
continue
existing_comments = [c['comment'] for c in comment_list]
if BUGANIZER_COMMENT not in existing_comments:
buganizer_client.NewComment(url, BUGANIZER_COMMENT)
except buganizer.BuganizerError:
logging.exception('Could not fetch or add comments for %s', url)
def _GetBuganizerClient() -> buganizer.BuganizerClient:
"""Helper function to get a usable Buganizer client."""
return buganizer.BuganizerClient()