# Copyright 2023 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Run all Chromium libfuzzer targets that have corresponding corpora,
then save the profdata files.
* Example usage: run_all_fuzzers.py --fuzzer-binaries-dir foo
--fuzzer-corpora-dir bar --profdata-outdir baz
"""
import argparse
from multiprocessing import Process, Manager, cpu_count, Pool
import os
import subprocess
import glob
from typing import Mapping, Sequence
WHOLE_CORPUS_RETRIES = 2
WHOLE_CORPUS_TIMEOUT_SECS = 1200
INDIVIDUAL_TESTCASE_TIMEOUT_SECS = 60
INDIVIDUAL_TESTCASES_MAX_TO_TRY = 500
INDIVIDUAL_TESTCASES_SUCCESSES_NEEDED = 100
def _profdata_merge(inputs: Sequence[str], output: str) -> bool:
"""Merges the given profraw files into a single file.
Deletes any inputs, whether or not it succeeded.
Args:
inputs: paths to input files.
output: output file path.
Returns:
True if it worked.
"""
llvm_profdata_cmd = [llvm_profdata, 'merge', '-sparse'
] + inputs + ['-o', output]
try:
subprocess.check_call(llvm_profdata_cmd)
return True
except Exception as e:
# TODO(crbug.com/328849489: investigate failures
print("profdata merge failed, treating this target as failed")
finally:
for f in inputs:
if os.path.exists(f):
os.unlink(f)
return False
def _run_and_log(cmd: Sequence[str], env: Mapping[str, str], timeout: float,
annotation: str) -> bool:
"""Runs a given command and logs output in case of failure.
Args:
cmd: the command and its arguments.
env: environment variables to apply.
timeout: the timeout to apply, in seconds.
annotation: annotation to add to logging.
Returns:
True iff the command ran successfully.
"""
print(f"Trying command: {cmd} ({annotation})")
try:
subprocess.run(cmd,
env=env,
timeout=timeout,
capture_output=True,
check=True)
return True
except Exception as e:
if type(e) == subprocess.TimeoutExpired:
print(
f"Command {cmd!s} ({annotation}) timed out after {e.timeout!s} seconds"
)
else:
print(
f"Command {cmd!s} ({annotation}) return code: {e.returncode!s}\nStdout:\n{e.output}\nStderr:\n{e.stderr}"
)
return False
def _erase_profraws(pattern):
"""Erases any pre-existing profraws matching a LLVM_PROFILE_FILE pattern.
Parameters:
pattern: An LLVM_PROFILE_FILE environment variable value, which may
contain %p for a process ID
"""
pattern = pattern.replace("%p", "*")
for f in glob.iglob(pattern):
os.unlink(f)
def _matching_profraws(pattern):
"""Returns a list of filenames matching a given LLVM_PROFILE_FILE pattern.
Parameters:
pattern: An LLVM_PROFILE_FILE environment variable value, which may
contain %p for a process ID
"""
pattern = pattern.replace("%p", "*")
return [f for f in glob.iglob(pattern) if os.path.getsize(f) > 0]
def _run_fuzzer_target(args):
"""Runs a given fuzzer target. Designed to be called in parallel.
Parameters:
args[0]: A dict containing information about what to run. Must contain:
name: name of the fuzzer target
corpus_dir: where to find its corpus. May be None.
profraw_dir: the directory in which to create a .profraws temporarily
profdata_file: the output .profdata filename to create
env: a dict of additional environment variables. This function will
append profdata environment variables.
cmd: a list of command line arguments, including the binary name.
This function will append corpus entries.
args[1]: A multiprocessing.Manager.list for names of successful fuzzers.
args[2]: A multiprocessing.Manager.list for names of failed fuzzers.
args[3]: The number of targets (for logging purposes only)
Returns:
None.
"""
target_details = args[0]
verified_fuzzer_targets = args[1]
failed_targets = args[2]
num_targets = args[3]
target = target_details['name']
cmd = target_details['cmd']
env = target_details['env']
corpus_dir = target_details['corpus']
profraw_dir = target_details['profraw_dir']
target_profdata = target_details['profdata_file']
print("Starting target %s (completed %d/%d, of which %d succeeded)" %
(target, len(verified_fuzzer_targets) + len(failed_targets),
num_targets, len(verified_fuzzer_targets)))
fullcorpus_profraw = os.path.join(profraw_dir, target + "_%p.profraw")
env['LLVM_PROFILE_FILE'] = fullcorpus_profraw
fullcorpus_cmd = cmd.copy()
if corpus_dir is not None:
fullcorpus_cmd.append(corpus_dir)
_erase_profraws(fullcorpus_profraw)
for i in range(WHOLE_CORPUS_RETRIES):
ok = _run_and_log(fullcorpus_cmd, env, WHOLE_CORPUS_TIMEOUT_SECS,
f"full corpus attempt {i}")
if ok:
break
valid_profiles = 0
for profraw in _matching_profraws(fullcorpus_profraw):
ok = _profdata_merge([profraw], target_profdata)
if ok:
valid_profiles = 1
if valid_profiles == 0:
# We failed to run the fuzzer with the whole corpus in one go. That probably
# means one of the test cases caused a crash. Let's run each test
# case one at a time. The resulting profraw files can be hundreds of MB
# each so after each test case, we merge them into an accumulated
# profdata file.
for count, corpus_entry in enumerate(os.listdir(corpus_dir)):
specific_test_case_profraw = os.path.join(
profraw_dir, target + "_" + str(count) + "_%p.profraw")
test_case = os.path.join(corpus_dir, corpus_entry)
specific_test_case_cmd = cmd + [test_case]
env['LLVM_PROFILE_FILE'] = specific_test_case_profraw
_erase_profraws(specific_test_case_profraw)
_run_and_log(specific_test_case_cmd, env,
INDIVIDUAL_TESTCASE_TIMEOUT_SECS,
f"specific test case {count}")
resulting_profraws = list(_matching_profraws(specific_test_case_profraw))
if resulting_profraws:
# We recorded valid profraws, let's merge them into
# the accumulating profdata
valid_profiles += 1
temp_profdata = os.path.join(profraw_dir,
target + "_accumlated.profraw")
if os.path.exists(target_profdata):
os.rename(target_profdata, temp_profdata)
resulting_profraws.append(temp_profdata)
ok = _profdata_merge(resulting_profraws, target_profdata)
if not ok:
valid_profiles = 0
break
# The corpus may be huge - don't keep going forever.
if count > INDIVIDUAL_TESTCASES_MAX_TO_TRY:
print(
f"Skipping remaining test cases for {target} - >{INDIVIDUAL_TESTCASES_MAX_TO_TRY} tried"
)
break
# And if we've got enough valid coverage files, assume this is a
# reasonable approximation of the total coverage. This is partly
# to ensure the profdata command line isn't too huge, partly
# to reduce processing time to something reasonable, and partly
# because profraw files are huge and can fill up bot disk space.
if valid_profiles > INDIVIDUAL_TESTCASES_SUCCESSES_NEEDED:
print(
f"Skipping remaining test cases for {target}, >%{INDIVIDUAL_TESTCASES_SUCCESSES_NEEDED} valid profiles recorded."
)
break
if valid_profiles == 0:
failed_targets.append(target)
return
verified_fuzzer_targets.append(target)
print("Finishing target %s (completed %d/%d, of which %d succeeded)" %
(target, len(verified_fuzzer_targets) + len(failed_targets),
num_targets, len(verified_fuzzer_targets)))
def _ParseCommandArguments():
"""Adds and parses relevant arguments for tool commands.
Returns:
A dictionary representing the arguments.
"""
arg_parser = argparse.ArgumentParser()
arg_parser.usage = __doc__
arg_parser.add_argument(
'--fuzzer-binaries-dir',
required=True,
type=str,
help='Directory where the fuzzer binaries have been built.')
arg_parser.add_argument(
'--fuzzer-corpora-dir',
required=True,
type=str,
help='Directory into which corpora have been downloaded.')
arg_parser.add_argument('--profdata-outdir',
required=True,
type=str,
help='Directory where profdata will be stored.')
arg_parser.add_argument
args = arg_parser.parse_args()
return args
args = _ParseCommandArguments()
incomplete_targets = []
verified_fuzzer_targets = Manager().list()
failed_targets = Manager().list()
reportdir = 'out/report'
all_target_details = []
llvm_profdata = 'third_party/llvm-build/Release+Asserts/bin/llvm-profdata'
if not (os.path.isfile(llvm_profdata)):
print('No valid llvm_profdata at %s' % llvm_profdata)
exit(2)
if not (os.path.isdir(args.profdata_outdir)):
print('%s does not exist or is not a directory' % args.profdata_oudir)
exit(2)
for fuzzer_target in os.listdir(args.fuzzer_corpora_dir):
fuzzer_target_binpath = os.path.join(args.fuzzer_binaries_dir, fuzzer_target)
fuzzer_target_corporadir = os.path.join(args.fuzzer_corpora_dir,
fuzzer_target)
if not (os.path.isfile(fuzzer_target_binpath)
and os.path.isdir(fuzzer_target_corporadir)):
print(
('Could not find binary file for %s, or, the provided corpora path is '
'not a directory') % fuzzer_target)
incomplete_targets.append(fuzzer_target)
else:
env = dict()
if 'DISPLAY' in os.environ:
# Inherit X settings from the real environment
env['DISPLAY'] = os.environ['DISPLAY']
all_target_details.append({
'name':
fuzzer_target,
'profraw_dir':
reportdir,
'profdata_file':
os.path.join(reportdir, fuzzer_target + ".profdata"),
'env':
env,
# RSS limit 8GB. Some of our fuzzers which involve running significant
# chunks of Chromium code require more than the 2GB default.
'cmd': [fuzzer_target_binpath, '-runs=0', '-rss_limit_mb=8192'],
'corpus':
fuzzer_target_corporadir
})
# We also want to run ./chrome without a valid X server.
# It will almost immediately exit.
# This runs essentially no Chrome code, so will result in all the lines
# of code in the Chrome binary being marked as 0 in the code coverage
# report. Without doing this step, many of the files of Chrome source
# code simply don't appear in the coverage report at all.
chrome_target_binpath = os.path.join(args.fuzzer_binaries_dir, "chrome")
if not os.path.isfile(chrome_target_binpath):
print('Could not find binary file for Chrome itself')
else:
profraw_file = chrome_target_binpath + ".profraw"
profraw_path = os.path.join(reportdir, profraw_file)
env = {'DISPLAY': 'not-a-real-display'}
all_target_details.append({
'name':
"chrome",
'profraw_dir':
reportdir,
'profdata_file':
os.path.join(reportdir, "chrome.profdata"),
'env':
env,
'cmd': [chrome_target_binpath],
'corpus':
None
})
# Run the fuzzers in parallel.
cpu_count = int(cpu_count())
num_targets = len(all_target_details)
print("Running %d fuzzers across %d CPUs" % (num_targets, cpu_count))
with Pool(cpu_count) as p:
results = p.map(
_run_fuzzer_target,
[(target_details, verified_fuzzer_targets, failed_targets, num_targets)
for target_details in all_target_details])
print("Successful targets: %s" % verified_fuzzer_targets)
print("Failed targets: %s" % failed_targets)
print("Incomplete targets (couldn't find binary): %s" % incomplete_targets)
print("Finished getting coverage information. Copying to %s" %
args.profdata_outdir)
for fuzzer in verified_fuzzer_targets:
cmd = [
'cp',
os.path.join(reportdir, fuzzer + '.profdata'), args.profdata_outdir
]
print(cmd)
try:
subprocess.check_call(cmd)
except:
print.warning("Warning: failed to copy profdata for %s" % fuzzer)