"""util.py - General utilities for running, loading, and processing benchmarks
"""
import json
import os
import re
import subprocess
import sys
import tempfile
# Input file type enumeration
IT_Invalid = 0
IT_JSON = 1
IT_Executable = 2
_num_magic_bytes = 2 if sys.platform.startswith("win") else 4
def is_executable_file(filename):
"""
Return 'True' if 'filename' names a valid file which is likely
an executable. A file is considered an executable if it starts with the
magic bytes for a EXE, Mach O, or ELF file.
"""
if not os.path.isfile(filename):
return False
with open(filename, mode="rb") as f:
magic_bytes = f.read(_num_magic_bytes)
if sys.platform == "darwin":
return magic_bytes in [
b"\xfe\xed\xfa\xce", # MH_MAGIC
b"\xce\xfa\xed\xfe", # MH_CIGAM
b"\xfe\xed\xfa\xcf", # MH_MAGIC_64
b"\xcf\xfa\xed\xfe", # MH_CIGAM_64
b"\xca\xfe\xba\xbe", # FAT_MAGIC
b"\xbe\xba\xfe\xca", # FAT_CIGAM
]
elif sys.platform.startswith("win"):
return magic_bytes == b"MZ"
else:
return magic_bytes == b"\x7FELF"
def is_json_file(filename):
"""
Returns 'True' if 'filename' names a valid JSON output file.
'False' otherwise.
"""
try:
with open(filename, "r") as f:
json.load(f)
return True
except BaseException:
pass
return False
def classify_input_file(filename):
"""
Return a tuple (type, msg) where 'type' specifies the classified type
of 'filename'. If 'type' is 'IT_Invalid' then 'msg' is a human readable
string representing the error.
"""
ftype = IT_Invalid
err_msg = None
if not os.path.exists(filename):
err_msg = "'%s' does not exist" % filename
elif not os.path.isfile(filename):
err_msg = "'%s' does not name a file" % filename
elif is_executable_file(filename):
ftype = IT_Executable
elif is_json_file(filename):
ftype = IT_JSON
else:
err_msg = (
"'%s' does not name a valid benchmark executable or JSON file"
% filename
)
return ftype, err_msg
def check_input_file(filename):
"""
Classify the file named by 'filename' and return the classification.
If the file is classified as 'IT_Invalid' print an error message and exit
the program.
"""
ftype, msg = classify_input_file(filename)
if ftype == IT_Invalid:
print("Invalid input file: %s" % msg)
sys.exit(1)
return ftype
def find_benchmark_flag(prefix, benchmark_flags):
"""
Search the specified list of flags for a flag matching `<prefix><arg>` and
if it is found return the arg it specifies. If specified more than once the
last value is returned. If the flag is not found None is returned.
"""
assert prefix.startswith("--") and prefix.endswith("=")
result = None
for f in benchmark_flags:
if f.startswith(prefix):
result = f[len(prefix) :]
return result
def remove_benchmark_flags(prefix, benchmark_flags):
"""
Return a new list containing the specified benchmark_flags except those
with the specified prefix.
"""
assert prefix.startswith("--") and prefix.endswith("=")
return [f for f in benchmark_flags if not f.startswith(prefix)]
def load_benchmark_results(fname, benchmark_filter):
"""
Read benchmark output from a file and return the JSON object.
Apply benchmark_filter, a regular expression, with nearly the same
semantics of the --benchmark_filter argument. May be None.
Note: the Python regular expression engine is used instead of the
one used by the C++ code, which may produce different results
in complex cases.
REQUIRES: 'fname' names a file containing JSON benchmark output.
"""
def benchmark_wanted(benchmark):
if benchmark_filter is None:
return True
name = benchmark.get("run_name", None) or benchmark["name"]
return re.search(benchmark_filter, name) is not None
with open(fname, "r") as f:
results = json.load(f)
if "context" in results:
if "json_schema_version" in results["context"]:
json_schema_version = results["context"]["json_schema_version"]
if json_schema_version != 1:
print(
"In %s, got unnsupported JSON schema version: %i, expected 1"
% (fname, json_schema_version)
)
sys.exit(1)
if "benchmarks" in results:
results["benchmarks"] = list(
filter(benchmark_wanted, results["benchmarks"])
)
return results
def sort_benchmark_results(result):
benchmarks = result["benchmarks"]
# From inner key to the outer key!
benchmarks = sorted(
benchmarks,
key=lambda benchmark: benchmark["repetition_index"]
if "repetition_index" in benchmark
else -1,
)
benchmarks = sorted(
benchmarks,
key=lambda benchmark: 1
if "run_type" in benchmark and benchmark["run_type"] == "aggregate"
else 0,
)
benchmarks = sorted(
benchmarks,
key=lambda benchmark: benchmark["per_family_instance_index"]
if "per_family_instance_index" in benchmark
else -1,
)
benchmarks = sorted(
benchmarks,
key=lambda benchmark: benchmark["family_index"]
if "family_index" in benchmark
else -1,
)
result["benchmarks"] = benchmarks
return result
def run_benchmark(exe_name, benchmark_flags):
"""
Run a benchmark specified by 'exe_name' with the specified
'benchmark_flags'. The benchmark is run directly as a subprocess to preserve
real time console output.
RETURNS: A JSON object representing the benchmark output
"""
output_name = find_benchmark_flag("--benchmark_out=", benchmark_flags)
is_temp_output = False
if output_name is None:
is_temp_output = True
thandle, output_name = tempfile.mkstemp()
os.close(thandle)
benchmark_flags = list(benchmark_flags) + [
"--benchmark_out=%s" % output_name
]
cmd = [exe_name] + benchmark_flags
print("RUNNING: %s" % " ".join(cmd))
exitCode = subprocess.call(cmd)
if exitCode != 0:
print("TEST FAILED...")
sys.exit(exitCode)
json_res = load_benchmark_results(output_name, None)
if is_temp_output:
os.unlink(output_name)
return json_res
def run_or_load_benchmark(filename, benchmark_flags):
"""
Get the results for a specified benchmark. If 'filename' specifies
an executable benchmark then the results are generated by running the
benchmark. Otherwise 'filename' must name a valid JSON output file,
which is loaded and the result returned.
"""
ftype = check_input_file(filename)
if ftype == IT_JSON:
benchmark_filter = find_benchmark_flag(
"--benchmark_filter=", benchmark_flags
)
return load_benchmark_results(filename, benchmark_filter)
if ftype == IT_Executable:
return run_benchmark(filename, benchmark_flags)
raise ValueError("Unknown file type %s" % ftype)