# Copyright 2016 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# TODO(dawn:549) Move WebGPU caching tests to a separate module to trim file.
# pylint: disable=too-many-lines
from enum import Enum
import logging
import os
import posixpath
import sys
import tempfile
from typing import Any, Generator, Iterator, List, Optional, Set, Tuple
import unittest
import dataclasses # Built-in, but pylint gives an ordering false positive.
from gpu_tests import common_browser_args as cba
from gpu_tests import common_typing as ct
from gpu_tests import gpu_integration_test
from gpu_tests import overlay_support
from gpu_tests import trace_test_pages
from gpu_tests.util import host_information
import gpu_path_util
from telemetry.timeline import model as model_module
from telemetry.timeline import tracing_config
gpu_data_relative_path = gpu_path_util.GPU_DATA_RELATIVE_PATH
data_paths = [
gpu_path_util.GPU_DATA_DIR,
os.path.join(gpu_path_util.CHROMIUM_SRC_DIR, 'media', 'test', 'data')
]
webgl_test_harness_script = r"""
var domAutomationController = {};
domAutomationController._finished = false;
domAutomationController._originalLog = window.console.log;
domAutomationController._messages = '';
domAutomationController.log = function(msg) {
domAutomationController._messages += msg + "\n";
domAutomationController._originalLog.apply(window.console, [msg]);
}
domAutomationController.send = function(msg) {
// Issue a read pixel to synchronize the gpu process to ensure
// the asynchronous category enabling is finished.
var temp_canvas = document.createElement("canvas")
temp_canvas.width = 1;
temp_canvas.height = 1;
var temp_gl = temp_canvas.getContext("experimental-webgl") ||
temp_canvas.getContext("webgl");
if (temp_gl) {
temp_gl.clear(temp_gl.COLOR_BUFFER_BIT);
var id = new Uint8Array(4);
temp_gl.readPixels(0, 0, 1, 1, temp_gl.RGBA, temp_gl.UNSIGNED_BYTE, id);
} else {
console.log('Failed to get WebGL context.');
}
domAutomationController._finished = true;
}
window.domAutomationController = domAutomationController;
"""
basic_test_harness_script = r"""
var domAutomationController = {};
domAutomationController._proceed = false;
domAutomationController._readyForActions = false;
domAutomationController._succeeded = false;
domAutomationController._finished = false;
domAutomationController._originalLog = window.console.log;
domAutomationController._messages = '';
domAutomationController.log = function(msg) {
domAutomationController._messages += msg + "\n";
domAutomationController._originalLog.apply(window.console, [msg]);
}
domAutomationController.send = function(msg) {
domAutomationController._proceed = true;
let lmsg = msg.toLowerCase();
if (lmsg == "ready") {
domAutomationController._readyForActions = true;
} else {
domAutomationController._finished = true;
if (lmsg == "success") {
domAutomationController._succeeded = true;
} else {
domAutomationController._succeeded = false;
}
}
}
window.domAutomationController = domAutomationController;
"""
_GET_STATISTICS_EVENT_NAME = 'GetFrameStatisticsMedia'
_SWAP_CHAIN_PRESENT_EVENT_NAME = 'SwapChain::Present'
_BEGIN_OVERLAY_ACCESS_EVENT_NAME = 'SkiaOutputDeviceDComp::BeginOverlayAccess'
_PRESENT_SWAP_CHAIN_EVENT_NAME = 'IDXGISwapChain1::Present1'
_HTML_CANVAS_NOTIFY_LISTENERS_CANVAS_CHANGED_EVENT_NAME =\
'HTMLCanvasElement::NotifyListenersCanvasChanged'
_STATIC_BITMAP_TO_VID_FRAME_CONVERT_EVENT_NAME =\
'StaticBitmapImageToVideoFrameCopier::Convert'
_MFD3D11VC_CAPTURE_EVENT_NAME = 'CopyTextureToGpuMemoryBuffer'
_MFD3D11VC_MAP_EVENT_NAME = 'GpuMemoryBufferTrackerWin::DuplicateAsUnsafeRegion'
_MFD3D11VC_PRESENT_EVENT_NAME = 'DXGISharedHandleState::AcquireKeyedMutex'
# Caching events and constants
_GPU_HOST_STORE_BLOB_EVENT_NAME =\
'GpuHostImpl::StoreBlobToDisk'
_WEBGPU_BLOB_CACHE_HIT_EVENT_NAME = \
'DawnCachingInterface::CacheHit'
# Refers to GpuDiskCacheType::kDawnWebGPU see the following header:
# gpu/ipc/common/gpu_disk_cache_type.h
_WEBGPU_CACHE_HANDLE_TYPE = 1
# Special 'other_args' keys that is used to pass additional arguments
_MIN_CACHE_HIT_KEY = 'min_cache_hits'
_PROFILE_DIR_KEY = 'profile_dir'
_PROFILE_TYPE_KEY = 'profile_type'
class _TraceTestOrigin(Enum):
"""Enum type for different origin types when navigating to URLs.
The default enum DEFAULT resolves URLs using the explicit localhost IP,
i.e. 127.0.0.1. LOCALHOST resolves URLs using 'localhost' instead. This is
useful when we want the navigations to hit the same resource but appear to
be from a different origin.
As an implementation detail, the values of the enums correspond to the name of
the SeriallyExecutedBrowserTestCase instance functions to get the URLs."""
DEFAULT = 'UrlOfStaticFilePath'
LOCALHOST = 'LocalhostUrlOfStaticFilePath'
@dataclasses.dataclass
class _TraceTestArguments():
"""Struct-like object for passing trace test arguments instead of dicts."""
browser_args: List[str]
category: str
test_harness_script: str
finish_js_condition: str
success_eval_func: str
other_args: dict
restart_browser: bool = True
origin: _TraceTestOrigin = _TraceTestOrigin.DEFAULT
@dataclasses.dataclass
class _CacheTraceTestArguments():
"""Struct-like object for passing persistent cache trace test arguments.
Cache trace tests consist of a series of normal trace test invocations that
are necessary in order to verify the expected caching behaviors. The tests
start with a first load page which is generally used to populate cache
entries. If |test_renavigation| is true, the same browser that opened the
first load page is navigated to each cache page in |cache_pages| and the cache
conditions are verified. Then, regardless of the value of |test_renavigation|,
we iterate across the |cache_pages| again and restart the browser for each
page using a new clean user data directory that is seeded with the contents
from the first load page, and verify the cache conditions. The seeding just
copies all files in the user data directory from the first load over, see
*BrowserFinder classes for more details on the seeding:
//third_party/catapult/telemetry/telemetry/internal/backends/chrome/
The renavigation tests are suitable when we are verifying for cache hits since
no new entries should be generated in the |cache_pages|. However, they are not
suitable for cache miss cases because each |cache_page| may cause entries to
be written to the cache, thereby causing subsequent |cache_pages| to see cache
hits when we actually expect them to be misses. Note this is not a problem
for the restarted browser case because each browser restart seeds a new
temporary directory with only the contents after the first load page.
"""
browser_args: List[str]
category: str
test_harness_script: str
finish_js_condition: str
first_load_eval_func: str
cache_eval_func: str
cache_pages: List[str]
cache_page_origin: _TraceTestOrigin = _TraceTestOrigin.DEFAULT
test_renavigation: bool = True
def GenerateFirstLoadTest(self) -> _TraceTestArguments:
"""Returns the trace test arguments for the first load cache test."""
return _TraceTestArguments(browser_args=self.browser_args,
category=self.category,
test_harness_script=self.test_harness_script,
finish_js_condition=self.finish_js_condition,
success_eval_func=self.first_load_eval_func,
other_args={},
restart_browser=True)
def GenerateCacheHitTests(
self, cache_args: Optional[dict]
) -> Generator[Tuple[str, _TraceTestArguments], None, None]:
"""Returns a generator for all cache hit trace tests.
First pass of tests just do a re-navigation, second pass restarts with a
seeded profile directory.
"""
if self.test_renavigation:
for cache_hit_page in self.cache_pages:
yield (posixpath.join(gpu_data_relative_path, cache_hit_page),
_TraceTestArguments(browser_args=self.browser_args,
category=self.category,
test_harness_script=self.test_harness_script,
finish_js_condition=self.finish_js_condition,
success_eval_func=self.cache_eval_func,
other_args=cache_args,
restart_browser=False,
origin=self.cache_page_origin))
for cache_hit_page in self.cache_pages:
yield (posixpath.join(gpu_data_relative_path, cache_hit_page),
_TraceTestArguments(browser_args=self.browser_args,
category=self.category,
test_harness_script=self.test_harness_script,
finish_js_condition=self.finish_js_condition,
success_eval_func=self.cache_eval_func,
other_args=cache_args,
restart_browser=True,
origin=self.cache_page_origin))
class TraceIntegrationTest(gpu_integration_test.GpuIntegrationTest):
"""Tests GPU traces are plumbed through properly.
Also tests that GPU Device traces show up on devices that support them."""
# All known prefixes used in GenerateGpuTests(). Necessary in order for the
# unittests to work properly since some tests are normally only generated on
# specific platforms.
known_test_prefixes = frozenset([
'OverlayModeTraceTest',
'SwapChainTraceTest',
'TraceTest',
'VideoPathTraceTest',
'WebGPUCachingTraceTest',
'WebGLCanvasCaptureTraceTest',
'WebGPUTraceTest',
])
@classmethod
def Name(cls) -> str:
return 'trace_test'
@classmethod
def _SuiteSupportsParallelTests(cls) -> bool:
return True
def _GetSerialGlobs(self) -> Set[str]:
serial_globs = set()
if host_information.IsWindows():
serial_globs |= {
# Flaky when run in parallel on Windows. For NVIDIA, it's likely due
# to the limited number of global overlay contexts supported. For
# Intel, it could be due to that same issue or something else. AMD
# may be able to run in parallel once we have an AMD fleet to confirm
# with.
'OverlayModeTraceTest_DirectComposition_Underlay*',
'OverlayModeTraceTest_DirectComposition_Video*',
}
return serial_globs
def _GetSerialTests(self) -> Set[str]:
serial_tests = set()
if host_information.IsLinux():
serial_tests |= {
# crbug.com/357559355
'TraceTest_Video_Media_Stream_Incompatible_Stride',
}
if host_information.IsMac():
serial_tests |= {
# Flaky when run in parallel on Mac.
'WebGPUTraceTest_WebGPUCanvasOneCopyCapture',
'WebGPUTraceTest_WebGPUCanvasDisableOneCopyCapture_Accelerated',
}
return serial_tests
@classmethod
def GenerateGpuTests(cls, options: ct.ParsedCmdArgs) -> ct.TestGenerator:
# pylint: disable=too-many-branches
# Include the device level trace tests, even though they're
# currently skipped on all platforms, to give a hint that they
# should perhaps be enabled in the future.
namespace = trace_test_pages.TraceTestPages
for p in namespace.DefaultPages('TraceTest'):
yield (p.name, posixpath.join(gpu_data_relative_path, p.url), [
_TraceTestArguments(
browser_args=p.browser_args,
category=cls._DisabledByDefaultTraceCategory('gpu.service'),
test_harness_script=webgl_test_harness_script,
finish_js_condition='domAutomationController._finished',
success_eval_func='CheckGLCategory',
other_args=p.other_args)
])
for p in namespace.VideoFromCanvasPages('WebGLCanvasCaptureTraceTest'):
yield (p.name, posixpath.join(gpu_data_relative_path, p.url), [
_TraceTestArguments(
browser_args=p.browser_args,
category='blink',
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
success_eval_func='CheckWebGLCanvasCapture',
other_args=p.other_args)
])
for p in namespace.WebGPUCanvasCapturePages('WebGPUTraceTest'):
yield (p.name, posixpath.join(gpu_data_relative_path, p.url), [
_TraceTestArguments(
browser_args=p.browser_args,
category='blink',
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
success_eval_func='CheckWebGPUCanvasCapture',
other_args=p.other_args)
])
if host_information.IsWindows():
for p in namespace.DirectCompositionPages('VideoPathTraceTest'):
yield (p.name, posixpath.join(gpu_data_relative_path, p.url), [
_TraceTestArguments(
browser_args=p.browser_args,
category=cls._DisabledByDefaultTraceCategory('gpu.service'),
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
success_eval_func='CheckVideoPath',
other_args=p.other_args)
])
# The increased swap count is necessary for tests to consistently pass on
# NVIDIA since overlays can take ~35 frames to take effect. See
# crbug.com/1505609.
for p in namespace.DirectCompositionPages('OverlayModeTraceTest',
swap_count=60):
yield (p.name, posixpath.join(gpu_data_relative_path, p.url), [
_TraceTestArguments(
browser_args=p.browser_args,
category=cls._DisabledByDefaultTraceCategory('gpu.service'),
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
success_eval_func='CheckOverlayMode',
other_args=p.other_args)
])
for p in namespace.LowLatencyPages('SwapChainTraceTest'):
yield (p.name, posixpath.join(gpu_data_relative_path, p.url), [
_TraceTestArguments(
browser_args=p.browser_args,
category='gpu',
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
success_eval_func='CheckSwapChainPath',
other_args=p.other_args)
])
for p in namespace.RootSwapChainTests('SwapChainTraceTest'):
yield (p.name, posixpath.join(gpu_data_relative_path, p.url), [
_TraceTestArguments(
browser_args=p.browser_args,
category='gpu',
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
success_eval_func='CheckSwapChainHasAlpha',
other_args=p.other_args)
])
for p in namespace.MediaFoundationD3D11VideoCaptureTests('TraceTest'):
yield (p.name, posixpath.join(gpu_data_relative_path, p.url), [
_TraceTestArguments(
browser_args=p.browser_args,
category='gpu,' +
cls._DisabledByDefaultTraceCategory('video_and_image_capture'),
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
success_eval_func='CheckMediaFoundationD3D11VideoCapture',
other_args=p.other_args)
])
for test in namespace.WebGpuLoadReloadCachingTests(
'WebGPUCachingTraceTest'):
yield (test.name,
posixpath.join(gpu_data_relative_path, test.first_load_page), [
_CacheTraceTestArguments(
browser_args=test.browser_args,
category='gpu',
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
first_load_eval_func='CheckWebGPUFirstLoadCache',
cache_eval_func='CheckWebGPUCacheHits',
cache_pages=test.cache_pages,
)
])
for test in namespace.WebGpuIncognitoCachingTests('WebGPUCachingTraceTest'):
yield (test.name,
posixpath.join(gpu_data_relative_path, test.first_load_page), [
_CacheTraceTestArguments(
browser_args=test.browser_args,
category='gpu',
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
first_load_eval_func='CheckWebGPUCacheHits',
cache_eval_func='CheckNoWebGPUCacheHits',
cache_pages=test.cache_pages)
])
for test in namespace.WebGpuDifferentOriginCachingTests(
'WebGPUCachingTraceTest'):
yield (test.name,
posixpath.join(gpu_data_relative_path, test.first_load_page), [
_CacheTraceTestArguments(
browser_args=test.browser_args,
category='gpu',
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
first_load_eval_func='CheckWebGPUFirstLoadCache',
cache_eval_func='CheckNoWebGPUCacheHits',
cache_pages=test.cache_pages,
cache_page_origin=_TraceTestOrigin.LOCALHOST,
test_renavigation=False)
])
for test in namespace.WebGpuCrossOriginCacheMissTests(
'WebGPUCachingTraceTest'):
yield (test.name,
posixpath.join(gpu_data_relative_path, test.first_load_page), [
_CacheTraceTestArguments(
browser_args=test.browser_args,
category='gpu',
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
first_load_eval_func='CheckWebGPUFirstLoadCache',
cache_eval_func='CheckNoWebGPUCacheHits',
cache_pages=test.cache_pages,
test_renavigation=False)
])
def _RunActualGpuTraceTest(self,
test_path: str,
args: _TraceTestArguments,
profile_dir: Optional[str] = None,
profile_type: Optional[str] = None) -> dict:
"""Returns a dictionary generated via the success evaluation."""
if args.restart_browser:
# The version of this test in the old GPU test harness restarted the
# browser after each test, so continue to do that to match its behavior
# by default for legacy tests.
self.RestartBrowserWithArgs(args.browser_args,
profile_dir=profile_dir,
profile_type=profile_type)
# Set up tracing.
config = tracing_config.TracingConfig()
config.chrome_trace_config.category_filter.AddExcludedCategory('*')
if args.category.find(',') != -1:
config.chrome_trace_config.category_filter.AddFilterString(args.category)
else:
config.chrome_trace_config.category_filter.AddFilter(args.category)
config.enable_chrome_trace = True
tab = self.tab
tab.browser.platform.tracing_controller.StartTracing(config, 60)
# Perform page navigation.
url = getattr(self, args.origin.value)(test_path)
tab.Navigate(url, script_to_evaluate_on_commit=args.test_harness_script)
try:
tab.action_runner.WaitForJavaScriptCondition(args.finish_js_condition,
timeout=60)
finally:
test_messages = tab.EvaluateJavaScript(
'domAutomationController._messages')
if test_messages:
logging.info('Logging messages from the test:\n%s', test_messages)
# Stop tracing.
timeline_data = tab.browser.platform.tracing_controller.StopTracing()
# Evaluate success.
if args.success_eval_func:
timeline_model = model_module.TimelineModel(timeline_data)
event_iter = timeline_model.IterAllEvents(
event_type_predicate=timeline_model.IsSliceOrAsyncSlice)
prefixed_func_name = '_EvaluateSuccess_' + args.success_eval_func
results = getattr(self, prefixed_func_name)(args.category, event_iter,
args.other_args)
return results if results else {}
return {}
def RunActualGpuTest(self, test_path: str, args: ct.TestArgs) -> None:
params = args[0]
if isinstance(params, _TraceTestArguments):
self._RunActualGpuTraceTest(test_path, params)
elif isinstance(params, _CacheTraceTestArguments):
# Create a new temporary directory for each cache test that is run.
cache_profile_dir = tempfile.TemporaryDirectory()
# Run the first load page and get the number of expected cache hits.
load_params = params.GenerateFirstLoadTest()
results =\
self._RunActualGpuTraceTest(test_path,
load_params,
profile_dir=cache_profile_dir.name,
profile_type='exact')
# Generate and run the cache hit tests using the seeded cache dir.
for (hit_path, trace_params) in params.GenerateCacheHitTests(results):
self._RunActualGpuTraceTest(hit_path,
trace_params,
profile_dir=cache_profile_dir.name,
profile_type='clean')
@classmethod
def SetUpProcess(cls) -> None:
super(TraceIntegrationTest, cls).SetUpProcess()
cls.CustomizeBrowserArgs([])
cls.StartBrowser()
cls.SetStaticServerDirs(data_paths)
@classmethod
def GenerateBrowserArgs(cls, additional_args: List[str]) -> List[str]:
"""Adds default arguments to |additional_args|.
See the parent class' method documentation for additional information.
"""
default_args = super(TraceIntegrationTest,
cls).GenerateBrowserArgs(additional_args)
default_args.extend([
cba.ENABLE_LOGGING,
cba.ENABLE_EXPERIMENTAL_WEB_PLATFORM_FEATURES,
# --test-type=gpu is used to suppress the "stability and security will
# suffer" infobar caused by --enable-gpu-benchmarking which can
# interfere with these tests.
cba.TEST_TYPE_GPU,
])
return default_args
@staticmethod
def _SwapChainPresentationModeListToStr(
presentation_mode_list: List[int]) -> str:
modes = [
overlay_support.PresentationModeEventToStr(m)
for m in presentation_mode_list
]
return f'[{",".join(modes)}]'
@staticmethod
def _DisabledByDefaultTraceCategory(category: str) -> str:
return 'disabled-by-default-%s' % category
#########################################
# The test success evaluation functions
def _EvaluateSuccess_CheckGLCategory(self, category: str,
event_iterator: Iterator,
other_args: dict) -> None:
del other_args # Unused in this particular success evaluation.
for event in event_iterator:
if (event.category == category
and event.args.get('gl_category', None) == 'gpu_toplevel'):
break
else:
self.fail('Trace markers for GPU category %s were not found' % category)
def _GetVideoExpectations(self, other_args: dict) -> '_VideoExpectations':
"""Helper for creating expectations for CheckVideoPath and CheckOverlayMode.
Args:
other_args: The |other_args| arg passed into the test.
Returns:
A _VideoExpectations instance with zero_copy, pixel_format, no_overlay,
and presentation_mode filled in.
"""
gpu = self.browser.GetSystemInfo().gpu.devices[0]
overlay_bot_config = overlay_support.GetOverlayConfigForGpu(gpu)
expected = _VideoExpectations()
expected.zero_copy = other_args.get('zero_copy', None)
expected.pixel_format = other_args.get('pixel_format', None)
expected.no_overlay = other_args.get('no_overlay', False)
video_rotation = other_args.get('video_rotation',
overlay_support.VideoRotation.UNROTATED)
video_is_not_scaled = other_args.get('full_size', False)
codec = other_args.get('codec', overlay_support.ZeroCopyCodec.UNSPECIFIED)
if overlay_bot_config.supports_overlays:
expected.pixel_format = overlay_bot_config.GetExpectedPixelFormat(
forced_pixel_format=expected.pixel_format)
expected.presentation_mode = (
overlay_bot_config.GetExpectedPresentationMode(
expected_pixel_format=expected.pixel_format,
video_rotation=video_rotation))
if expected.zero_copy is None and not expected.no_overlay:
expected.zero_copy = overlay_bot_config.GetExpectedZeroCopyUsage(
expected_pixel_format=expected.pixel_format,
video_rotation=video_rotation,
fullsize=video_is_not_scaled,
codec=codec)
return expected
def _EvaluateSuccess_CheckVideoPath(self, category: str,
event_iterator: Iterator,
other_args: dict) -> None:
"""Verifies Chrome goes down the code path as expected.
Depending on whether hardware overlays are supported or not, which formats
are supported in overlays, whether video is downscaled or not, whether
video is rotated or not, Chrome's video presentation code path can be
different.
"""
os_name = self.browser.platform.GetOSName()
assert os_name and os_name.lower() == 'win'
other_args = other_args or {}
expected = self._GetVideoExpectations(other_args)
# Verify expectations through captured trace events.
for event in event_iterator:
if event.category != category:
continue
if event.name != _SWAP_CHAIN_PRESENT_EVENT_NAME:
continue
if expected.no_overlay:
self.fail('Expected no overlay got %s' % _SWAP_CHAIN_PRESENT_EVENT_NAME)
detected_pixel_format = event.args.get('PixelFormat', None)
if detected_pixel_format is None:
self.fail('PixelFormat is missing from event %s' %
_SWAP_CHAIN_PRESENT_EVENT_NAME)
if expected.pixel_format != detected_pixel_format:
self.fail('SwapChain pixel format mismatch, expected %s got %s' %
(expected.pixel_format, detected_pixel_format))
detected_zero_copy = event.args.get('ZeroCopy', None)
if detected_zero_copy is None:
self.fail('ZeroCopy is missing from event %s' %
_SWAP_CHAIN_PRESENT_EVENT_NAME)
if expected.zero_copy != detected_zero_copy:
self.fail('ZeroCopy mismatch, expected %s got %s' %
(expected.zero_copy, detected_zero_copy))
break
else:
if expected.no_overlay:
return
self.fail(
'Events with name %s were not found' % _SWAP_CHAIN_PRESENT_EVENT_NAME)
def _EvaluateSuccess_CheckOverlayMode(self, category: str,
event_iterator: Iterator,
other_args: dict) -> None:
"""Verifies video frames are promoted to overlays when supported."""
os_name = self.browser.platform.GetOSName()
assert os_name and os_name.lower() == 'win'
other_args = other_args or {}
expected = self._GetVideoExpectations(other_args)
presentation_mode_history = []
for event in event_iterator:
if event.category != category:
continue
if event.name != _GET_STATISTICS_EVENT_NAME:
continue
if expected.no_overlay:
self.fail('Expected no overlay got %s' % _GET_STATISTICS_EVENT_NAME)
detected_presentation_mode = event.args.get('CompositionMode', None)
if detected_presentation_mode is None:
self.fail('PresentationMode is missing from event %s' %
_GET_STATISTICS_EVENT_NAME)
presentation_mode_history.append(detected_presentation_mode)
if expected.no_overlay:
return
valid_entry_found = False
for index, mode in enumerate(reversed(presentation_mode_history)):
# Be more tolerant for the beginning frames in non-overlay mode.
# Only check the last three entries.
if index >= 3:
break
if mode in (overlay_support.PresentationModeEvent.NONE,
overlay_support.PresentationModeEvent.GET_STATISTICS_FAILED):
# Be more tolerant to avoid test flakiness
continue
if (overlay_support.PresentationModeEventToStr(mode)
!= expected.presentation_mode):
self.fail('SwapChain presentation mode mismatch, expected %s got %s' %
(expected.presentation_mode,
TraceIntegrationTest._SwapChainPresentationModeListToStr(
presentation_mode_history)))
valid_entry_found = True
if not valid_entry_found:
self.fail(
'No valid frame statistics being collected: %s' % TraceIntegrationTest
._SwapChainPresentationModeListToStr(presentation_mode_history))
def _EvaluateSuccess_CheckSwapChainPath(self, category: str,
event_iterator: Iterator,
other_args: dict) -> None:
"""Verifies that swap chains are used as expected for low latency canvas."""
os_name = self.browser.platform.GetOSName()
assert os_name and os_name.lower() == 'win'
gpu = self.browser.GetSystemInfo().gpu.devices[0]
overlay_bot_config = overlay_support.GetOverlayConfigForGpu(gpu)
assert overlay_bot_config.direct_composition
expect_no_overlay = other_args and other_args.get('no_overlay', False)
expect_overlay = not expect_no_overlay
found_overlay = False
# Verify expectations through captured trace events.
for event in event_iterator:
if event.category != category:
continue
if event.name != _BEGIN_OVERLAY_ACCESS_EVENT_NAME:
continue
debug_label = event.args.get('debug_label', None)
if debug_label == 'SwapChainBuffer':
found_overlay = True
break
if expect_overlay and not found_overlay:
self.fail(
'Overlay expected but not found: matching %s events were not found' %
_BEGIN_OVERLAY_ACCESS_EVENT_NAME)
elif expect_no_overlay and found_overlay:
self.fail(
'Overlay not expected but found: matching %s events were found' %
_BEGIN_OVERLAY_ACCESS_EVENT_NAME)
def _EvaluateSuccess_CheckSwapChainHasAlpha(self, category: str,
event_iterator: Iterator,
other_args: dict) -> None:
"""Verified that all DXGI swap chains are presented with the expected alpha
mode."""
os_name = self.browser.platform.GetOSName()
assert os_name and os_name.lower() == 'win'
gpu = self.browser.GetSystemInfo().gpu.devices[0]
overlay_bot_config = overlay_support.GetOverlayConfigForGpu(gpu)
assert overlay_bot_config.direct_composition
expect_has_alpha = other_args and other_args.get('has_alpha', False)
has_present_swap_chain_event_with_has_alpha = False
# Verify expectations through captured trace events.
for event in event_iterator:
if event.category != category:
continue
if event.name != _PRESENT_SWAP_CHAIN_EVENT_NAME:
continue
got_has_alpha = event.args.get('has_alpha', None)
if got_has_alpha is not None:
has_present_swap_chain_event_with_has_alpha = True
if expect_has_alpha != got_has_alpha:
self.fail(
f'Expected events with name {_PRESENT_SWAP_CHAIN_EVENT_NAME} with'
f' has_alpha expected {expect_has_alpha}, got {got_has_alpha}')
# It's also considered a failure if we did not see the expected event.
if not has_present_swap_chain_event_with_has_alpha:
self.fail(
f'Expected events with name {_PRESENT_SWAP_CHAIN_EVENT_NAME} and '
'has_alpha value, but were not found')
def _EvaluateSuccess_CheckWebGLCanvasCapture(self, category: str,
event_iterator: Iterator,
other_args: dict) -> None:
if other_args is None:
return
expected_one_copy = other_args.get('one_copy', None)
expected_accelerated_two_copy = other_args.get('accelerated_two_copy', None)
if expected_one_copy and expected_accelerated_two_copy:
self.fail('one_copy and accelerated_two_copy are mutually exclusive')
found_one_copy_event = False
found_accelerated_two_copy_event = False
# Verify expectations through captured trace events.
for event in event_iterator:
if event.category != category:
continue
if (expected_one_copy is not None and event.name ==
_HTML_CANVAS_NOTIFY_LISTENERS_CANVAS_CHANGED_EVENT_NAME):
detected_one_copy = event.args.get('one_copy_canvas_capture', None)
if detected_one_copy is not None:
found_one_copy_event = True
if expected_one_copy != detected_one_copy:
self.fail('one_copy_canvas_capture mismatch, expected %s got %s' %
(expected_one_copy, detected_one_copy))
elif (expected_accelerated_two_copy is not None
and event.name == _STATIC_BITMAP_TO_VID_FRAME_CONVERT_EVENT_NAME):
detected_accelerated_two_copy = event.args.get(
'accelerated_frame_pool_copy', None)
if detected_accelerated_two_copy is not None:
found_accelerated_two_copy_event = True
if expected_accelerated_two_copy != detected_accelerated_two_copy:
self.fail(
'accelerated_frame_pool_copy mismatch, expected %s got %s' %
(expected_accelerated_two_copy, detected_accelerated_two_copy))
if expected_one_copy is not None and found_one_copy_event is False:
self.fail('%s events with one_copy_canvas_capture were not found' %
_HTML_CANVAS_NOTIFY_LISTENERS_CANVAS_CHANGED_EVENT_NAME)
if (expected_accelerated_two_copy is not None
and found_accelerated_two_copy_event is False):
self.fail('%s events with accelerated_frame_pool_copy were not found' %
_STATIC_BITMAP_TO_VID_FRAME_CONVERT_EVENT_NAME)
def _EvaluateSuccess_CheckWebGPUCanvasCapture(self, category: str,
event_iterator: Iterator,
other_args: dict) -> None:
expected_one_copy = other_args.get('one_copy', None)
expected_accelerated_two_copy = other_args.get('accelerated_two_copy', None)
if expected_one_copy and expected_accelerated_two_copy:
self.fail('one_copy and accelerated_two_copy are mutually exclusive')
found_one_copy_event = False
found_accelerated_two_copy_event = False
# Verify expectations through captured trace events.
for event in event_iterator:
if event.category != category:
continue
if (expected_one_copy is not None and event.name ==
_HTML_CANVAS_NOTIFY_LISTENERS_CANVAS_CHANGED_EVENT_NAME):
detected_one_copy = event.args.get('one_copy_canvas_capture', None)
if detected_one_copy is not None:
found_one_copy_event = True
if expected_one_copy != detected_one_copy:
self.fail('one_copy_canvas_capture mismatch, expected %s got %s' %
(expected_one_copy, detected_one_copy))
elif (expected_accelerated_two_copy is not None
and event.name == _STATIC_BITMAP_TO_VID_FRAME_CONVERT_EVENT_NAME):
detected_accelerated_two_copy = event.args.get(
'accelerated_frame_pool_copy', None)
if detected_accelerated_two_copy is not None:
found_accelerated_two_copy_event = True
if expected_accelerated_two_copy != detected_accelerated_two_copy:
self.fail(
'accelerated_frame_pool_copy mismatch, expected %s got %s' %
(expected_accelerated_two_copy, detected_accelerated_two_copy))
if expected_one_copy is not None and found_one_copy_event is False:
self.fail('%s events with one_copy_canvas_capture were not found' %
_HTML_CANVAS_NOTIFY_LISTENERS_CANVAS_CHANGED_EVENT_NAME)
if (expected_accelerated_two_copy is not None
and found_accelerated_two_copy_event is False):
self.fail('%s events with accelerated_frame_pool_copy were not found' %
_STATIC_BITMAP_TO_VID_FRAME_CONVERT_EVENT_NAME)
def _EvaluateSuccess_CheckWebGPUFirstLoadCache(self, category: str,
event_iterator: Iterator,
_other_args: dict) -> dict:
stored_blobs = 0
for event in event_iterator:
if event.category != category:
continue
if event.name == _GPU_HOST_STORE_BLOB_EVENT_NAME:
if event.args.get('handle_type', None) == _WEBGPU_CACHE_HANDLE_TYPE:
stored_blobs += 1
if stored_blobs == 0:
self.fail('Expected at least 1 cache entry to be written.')
return {_MIN_CACHE_HIT_KEY: stored_blobs}
def _EvaluateSuccess_CheckWebGPUCacheHits(self, category: str,
event_iterator: Iterator,
other_args: dict) -> None:
cache_hits = 0
for event in event_iterator:
if event.category != category:
continue
if event.name == _GPU_HOST_STORE_BLOB_EVENT_NAME:
if event.args.get('handle_type', None) == _WEBGPU_CACHE_HANDLE_TYPE:
self.fail('Unexpected WebGPU cache entry was stored on reloaded page')
if event.name == _WEBGPU_BLOB_CACHE_HIT_EVENT_NAME:
cache_hits += 1
stored_blobs = other_args.get(_MIN_CACHE_HIT_KEY, 1)
if cache_hits == 0 or cache_hits < stored_blobs:
self.fail('WebGPU cache hits (%d) is 0 or less than blobs stored (%d).' %
(cache_hits, stored_blobs))
def _EvaluateSuccess_CheckNoWebGPUCacheHits(self, category: str,
event_iterator: Iterator,
_other_args: dict) -> None:
cache_hits = 0
for event in event_iterator:
if event.category != category:
continue
if event.name == _WEBGPU_BLOB_CACHE_HIT_EVENT_NAME:
cache_hits += 1
if cache_hits != 0:
self.fail('Expected 0 WebGPU cache hits, but got %d.' % cache_hits)
def _EvaluateSuccess_CheckMediaFoundationD3D11VideoCapture(
self, category: str, event_iterator: Iterator, _other_args: dict) -> None:
del category # Unused.
os_version = self.browser.platform.GetOSVersionName()
assert os_version
if os_version.lower() not in ['win10', 'win11']:
self.skipTest(
'MediaFoundationD3D11VideoCapture only available on win 10+')
js_succeeded = self.tab.EvaluateJavaScript(
'domAutomationController._succeeded')
self.assertTrue(js_succeeded)
found_events = {
_MFD3D11VC_CAPTURE_EVENT_NAME: False,
_MFD3D11VC_MAP_EVENT_NAME: False,
_MFD3D11VC_PRESENT_EVENT_NAME: False,
}
for event in event_iterator:
if event.name in found_events:
found_events[event.name] = True
for event_name, found in found_events.items():
if not found:
self.fail(f'No {event_name} events found')
@classmethod
def ExpectationsFiles(cls) -> List[str]:
return [
os.path.join(
os.path.dirname(os.path.abspath(__file__)), 'test_expectations',
'trace_test_expectations.txt')
]
@dataclasses.dataclass
class _VideoExpectations():
"""Struct-like object for passing around video test expectations."""
pixel_format: Optional[str] = None
zero_copy: Optional[bool] = None
no_overlay: Optional[bool] = None
presentation_mode: Optional[str] = None
def load_tests(loader: unittest.TestLoader, tests: Any,
pattern: Any) -> unittest.TestSuite:
del loader, tests, pattern # Unused.
return gpu_integration_test.LoadAllTestsInModule(sys.modules[__name__])