# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import json
import sys
import time
import traceback
from multiprocessing import current_process
from threading import Lock, current_thread
from .logtypes import (
Any,
Boolean,
Dict,
Int,
List,
Nullable,
Status,
SubStatus,
TestId,
TestList,
Tuple,
Unicode,
convertor_registry,
log_action,
)
"""Structured Logging for recording test results.
Allowed actions, and subfields:
suite_start
tests - List of test names
name - Name for the suite
run_info - Dictionary of run properties
add_subsuite
name - Name for the subsuite (must be unique)
run_info - Updates to the suite run_info (optional)
suite_end
test_start
test - ID for the test
path - Relative path to test (optional)
subsuite - Name of the subsuite to which test belongs (optional)
test_end
test - ID for the test
status [PASS | FAIL | OK | ERROR | TIMEOUT | CRASH |
ASSERT PRECONDITION_FAILED | SKIP] - test status
expected [As for status] - Status that the test was expected to get,
or absent if the test got the expected status
extra - Dictionary of harness-specific extra information e.g. debug info
known_intermittent - List of known intermittent statuses that should
not fail a test. eg. ['FAIL', 'TIMEOUT']
subsuite - Name of the subsuite to which test belongs (optional)
test_status
test - ID for the test
subtest - Name of the subtest
status [PASS | FAIL | TIMEOUT |
PRECONDITION_FAILED | NOTRUN | SKIP] - test status
expected [As for status] - Status that the subtest was expected to get,
or absent if the subtest got the expected status
known_intermittent - List of known intermittent statuses that should
not fail a test. eg. ['FAIL', 'TIMEOUT']
subsuite - Name of the subsuite to which test belongs (optional)
process_output
process - PID of the process
command - Command line of the process
data - Output data from the process
test - ID of the test that the process was running (optional)
subsuite - Name of the subsuite that the process was running (optional)
assertion_count
count - Number of assertions produced
min_expected - Minimum expected number of assertions
max_expected - Maximum expected number of assertions
subsuite - Name of the subsuite for the tests that ran (optional)
lsan_leak
frames - List of stack frames from the leak report
scope - An identifier for the set of tests run during the browser session
(e.g. a directory name)
allowed_match - A stack frame in the list that matched a rule meaning the
leak is expected
subsuite - Name of the subsuite for the tests that ran (optional)
lsan_summary
bytes - Number of bytes leaked
allocations - Number of allocations
allowed - Boolean indicating whether all detected leaks matched allow rules
subsuite - Name of the subsuite for the tests that ran (optional)
mozleak_object
process - Process that leaked
bytes - Number of bytes that leaked
name - Name of the object that leaked
scope - An identifier for the set of tests run during the browser session
(e.g. a directory name)
allowed - Boolean indicating whether the leak was permitted
subsuite - Name of the subsuite for the tests that ran (optional)
log
level [CRITICAL | ERROR | WARNING |
INFO | DEBUG] - level of the logging message
message - Message to log
Subfields for all messages:
action - the action type of the current message
time - the timestamp in ms since the epoch of the log message
thread - name for the thread emitting the message
pid - id of the python process in which the logger is running
source - name for the source emitting the message
component - name of the subcomponent emitting the message
"""
_default_logger_name = None
def get_default_logger(component=None):
"""Gets the default logger if available, optionally tagged with component
name. Will return None if not yet set
:param component: The component name to tag log messages with
"""
global _default_logger_name
if not _default_logger_name:
return None
return StructuredLogger(_default_logger_name, component=component)
def set_default_logger(default_logger):
"""Sets the default logger to logger.
It can then be retrieved with :py:func:`get_default_logger`
Note that :py:func:`~mozlog.commandline.setup_logging` will
set a default logger for you, so there should be no need to call this
function if you're using setting up logging that way (recommended).
:param default_logger: The logger to set to default.
"""
global _default_logger_name
_default_logger_name = default_logger.name
log_levels = dict(
(k.upper(), v)
for v, k in enumerate(["critical", "error", "warning", "info", "debug"])
)
lint_levels = ["ERROR", "WARNING"]
def log_actions():
"""Returns the set of actions implemented by mozlog."""
return set(convertor_registry.keys())
class LoggerShutdownError(Exception):
"""Raised when attempting to log after logger.shutdown() has been called."""
class LoggerState(object):
def __init__(self):
self.reset()
def reset(self):
self.handlers = []
self.subsuites = set()
self.running_tests = set()
self.suite_started = False
self.component_states = {}
self.has_shutdown = False
class ComponentState(object):
def __init__(self):
self.filter_ = None
class StructuredLogger(object):
_lock = Lock()
_logger_states = {}
"""Create a structured logger with the given name
:param name: The name of the logger.
:param component: A subcomponent that the logger belongs to (typically a library name)
"""
def __init__(self, name, component=None):
self.name = name
self.component = component
with self._lock:
if name not in self._logger_states:
self._logger_states[name] = LoggerState()
if component not in self._logger_states[name].component_states:
self._logger_states[name].component_states[component] = ComponentState()
self._state = self._logger_states[name]
self._component_state = self._state.component_states[component]
def add_handler(self, handler):
"""Add a handler to the current logger"""
self._state.handlers.append(handler)
def remove_handler(self, handler):
"""Remove a handler from the current logger"""
self._state.handlers.remove(handler)
def reset_state(self):
"""Resets the logger to a brand new state. This means all handlers
are removed, running tests are discarded and components are reset.
"""
self._state.reset()
self._component_state = self._state.component_states[
self.component
] = ComponentState()
def send_message(self, topic, command, *args):
"""Send a message to each handler configured for this logger. This
part of the api is useful to those users requiring dynamic control
of a handler's behavior.
:param topic: The name used by handlers to subscribe to a message.
:param command: The name of the command to issue.
:param args: Any arguments known to the target for specialized
behavior.
"""
rv = []
for handler in self._state.handlers:
if hasattr(handler, "message_handler"):
rv += handler.message_handler.handle_message(topic, command, *args)
return rv
@property
def handlers(self):
"""A list of handlers that will be called when a
message is logged from this logger"""
return self._state.handlers
@property
def component_filter(self):
return self._component_state.filter_
@component_filter.setter
def component_filter(self, value):
self._component_state.filter_ = value
def log_raw(self, raw_data):
if "action" not in raw_data:
raise ValueError
action = raw_data["action"]
converted_data = convertor_registry[action].convert_known(**raw_data)
for k, v in raw_data.items():
if (
k not in converted_data and
k not in convertor_registry[action].optional_args
):
converted_data[k] = v
data = self._make_log_data(action, converted_data)
if action in ("test_status", "test_end"):
if (
data["expected"] == data["status"] or
data["status"] == "SKIP" or
"expected" not in raw_data
):
del data["expected"]
if not self._ensure_suite_state(action, data):
return
self._handle_log(data)
def _log_data(self, action, data=None):
if data is None:
data = {}
if data.get("subsuite") and data["subsuite"] not in self._state.subsuites:
self.error(f"Unrecognised subsuite {data['subsuite']}")
return
log_data = self._make_log_data(action, data)
self._handle_log(log_data)
def _handle_log(self, data):
if self._state.has_shutdown:
raise LoggerShutdownError(
"{} action received after shutdown.".format(data["action"])
)
with self._lock:
if self.component_filter:
data = self.component_filter(data)
if data is None:
return
for handler in self.handlers:
try:
handler(data)
except Exception:
# Write the exception details directly to stderr because
# log() would call this method again which is currently locked.
print(
"%s: Failure calling log handler:" % __name__,
file=sys.__stderr__,
)
print(traceback.format_exc(), file=sys.__stderr__)
def _make_log_data(self, action, data):
all_data = {
"action": action,
"time": int(time.time() * 1000),
"thread": current_thread().name,
"pid": current_process().pid,
"source": self.name,
}
if self.component:
all_data["component"] = self.component
all_data.update(data)
return all_data
def _ensure_suite_state(self, action, data):
if action == "suite_start":
if self._state.suite_started:
# limit data to reduce unnecessary log bloat
self.error(
"Got second suite_start message before suite_end. " +
"Logged with data: {}".format(json.dumps(data)[:100])
)
return False
self._state.suite_started = True
elif action == "suite_end":
if not self._state.suite_started:
self.error(
"Got suite_end message before suite_start. " +
"Logged with data: {}".format(json.dumps(data))
)
return False
self._state.suite_started = False
return True
@log_action(
TestList("tests"),
Unicode("name", default=None, optional=True),
Dict(Any, "run_info", default=None, optional=True),
Dict(Any, "version_info", default=None, optional=True),
Dict(Any, "device_info", default=None, optional=True),
Dict(Any, "extra", default=None, optional=True),
)
def suite_start(self, data):
"""Log a suite_start message
:param dict tests: Test identifiers that will be run in the suite, keyed by group name.
:param str name: Optional name to identify the suite.
:param dict run_info: Optional information typically provided by mozinfo.
:param dict version_info: Optional target application version information provided
by mozversion.
:param dict device_info: Optional target device information provided by mozdevice.
"""
if not self._ensure_suite_state("suite_start", data):
return
self._log_data("suite_start", data)
@log_action(
Unicode("name"),
Dict(Any, "run_info", default=None, optional=True),
)
def add_subsuite(self, data):
"""Log a add_subsuite message
:param str name: Name to identify the subsuite.
:param dict run_info: Optional information about the subsuite. This updates the suite run_info.
"""
if data["name"] in self._state.subsuites:
return
run_info = data.get("run_info", {"subsuite": data["name"]})
if "subsuite" not in run_info:
run_info = run_info.copy()
run_info["subsuite"] = data["name"]
data["run_info"] = run_info
self._state.subsuites.add(data["name"])
self._log_data("add_subsuite", data)
@log_action(Dict(Any, "extra", default=None, optional=True))
def suite_end(self, data):
"""Log a suite_end message"""
if not self._ensure_suite_state("suite_end", data):
return
self._state.subsuites.clear()
self._log_data("suite_end", data)
@log_action(
TestId("test"),
Unicode("path", default=None, optional=True),
Unicode("subsuite", default=None, optional=True),
)
def test_start(self, data):
"""Log a test_start message
:param test: Identifier of the test that will run.
:param path: Path to test relative to some base (typically the root of
the source tree).
:param subsuite: Optional name of the subsuite to which the test belongs.
"""
if not self._state.suite_started:
self.error(
"Got test_start message before suite_start for test %s" % data["test"]
)
return
test_key = (data.get("subsuite"), data["test"])
if test_key in self._state.running_tests:
self.error("test_start for %s logged while in progress." % data["test"])
return
self._state.running_tests.add(test_key)
self._log_data("test_start", data)
@log_action(
TestId("test"),
Unicode("subtest"),
SubStatus("status"),
SubStatus("expected", default="PASS"),
Unicode("message", default=None, optional=True),
Unicode("stack", default=None, optional=True),
Dict(Any, "extra", default=None, optional=True),
List(SubStatus, "known_intermittent", default=None, optional=True),
Unicode("subsuite", default=None, optional=True),
)
def test_status(self, data):
"""
Log a test_status message indicating a subtest result. Tests that
do not have subtests are not expected to produce test_status messages.
:param test: Identifier of the test that produced the result.
:param subtest: Name of the subtest.
:param status: Status string indicating the subtest result
:param expected: Status string indicating the expected subtest result.
:param message: Optional string containing a message associated with the result.
:param stack: Optional stack trace encountered during test execution.
:param extra: Optional suite-specific data associated with the test result.
:param known_intermittent: Optional list of string expected intermittent statuses
:param subsuite: Optional name of the subsuite to which the test belongs.
"""
if data["expected"] == data["status"] or data["status"] == "SKIP":
del data["expected"]
test_key = (data.get("subsuite"), data["test"])
if test_key not in self._state.running_tests:
self.error(
"test_status for %s logged while not in progress. "
"Logged with data: %s" % (data["test"], json.dumps(data))
)
return
self._log_data("test_status", data)
@log_action(
TestId("test"),
Status("status"),
Status("expected", default="OK"),
Unicode("message", default=None, optional=True),
Unicode("stack", default=None, optional=True),
Dict(Any, "extra", default=None, optional=True),
List(Status, "known_intermittent", default=None, optional=True),
Unicode("subsuite", default=None, optional=True),
)
def test_end(self, data):
"""
Log a test_end message indicating that a test completed. For tests
with subtests this indicates whether the overall test completed without
errors. For tests without subtests this indicates the test result
directly.
:param test: Identifier of the test that produced the result.
:param status: Status string indicating the test result
:param expected: Status string indicating the expected test result.
:param message: Optonal string containing a message associated with the result.
:param stack: Optional stack trace encountered during test execution.
:param extra: Optional suite-specific data associated with the test result.
:param subsuite: Optional name of the subsuite to which the test belongs.
"""
if data["expected"] == data["status"] or data["status"] == "SKIP":
del data["expected"]
test_key = (data.get("subsuite"), data["test"])
if test_key not in self._state.running_tests:
self.error(
"test_end for %s logged while not in progress. "
"Logged with data: %s" % (data["test"], json.dumps(data))
)
else:
self._state.running_tests.remove(test_key)
self._log_data("test_end", data)
@log_action(
Unicode("process"),
Unicode("data"),
Unicode("command", default=None, optional=True),
TestId("test", default=None, optional=True),
Unicode("subsuite", default=None, optional=True),
)
def process_output(self, data):
"""Log output from a managed process.
:param process: A unique identifier for the process producing the output
(typically the pid)
:param data: The output to log
:param command: Optional string representing the full command line used to start
the process.
:param test: Optional ID of the test which the process was running.
:param subsuite: Optional name of the subsuite which the process was running.
"""
self._log_data("process_output", data)
@log_action(
Unicode("process", default=None),
Unicode("signature", default="[Unknown]"),
TestId("test", default=None, optional=True),
Unicode("minidump_path", default=None, optional=True),
Unicode("minidump_extra", default=None, optional=True),
Int("stackwalk_retcode", default=None, optional=True),
Unicode("stackwalk_stdout", default=None, optional=True),
Unicode("stackwalk_stderr", default=None, optional=True),
Unicode("reason", default=None, optional=True),
Unicode("java_stack", default=None, optional=True),
Unicode("process_type", default=None, optional=True),
List(Unicode, "stackwalk_errors", default=None),
Unicode("subsuite", default=None, optional=True),
)
def crash(self, data):
if data["stackwalk_errors"] is None:
data["stackwalk_errors"] = []
self._log_data("crash", data)
@log_action(
Unicode("primary", default=None), List(Unicode, "secondary", default=None)
)
def valgrind_error(self, data):
self._log_data("valgrind_error", data)
@log_action(
Unicode("process"),
Unicode("command", default=None, optional=True),
Unicode("subsuite", default=None, optional=True),
)
def process_start(self, data):
"""Log start event of a process.
:param process: A unique identifier for the process producing the
output (typically the pid)
:param command: Optional string representing the full command line used to
start the process.
:param subsuite: Optional name of the subsuite using the process.
"""
self._log_data("process_start", data)
@log_action(
Unicode("process"),
Int("exitcode"),
Unicode("command", default=None, optional=True),
Unicode("subsuite", default=None, optional=True),
)
def process_exit(self, data):
"""Log exit event of a process.
:param process: A unique identifier for the process producing the
output (typically the pid)
:param exitcode: the exit code
:param command: Optional string representing the full command line used to
start the process.
:param subsuite: Optional name of the subsuite using the process.
"""
self._log_data("process_exit", data)
@log_action(
TestId("test"),
Int("count"),
Int("min_expected"),
Int("max_expected"),
Unicode("subsuite", default=None, optional=True),
)
def assertion_count(self, data):
"""Log count of assertions produced when running a test.
:param count: Number of assertions produced
:param min_expected: Minimum expected number of assertions
:param max_expected: Maximum expected number of assertions
:param subsuite: Optional name of the subsuite for the tests that ran
"""
self._log_data("assertion_count", data)
@log_action(
List(Unicode, "frames"),
Unicode("scope", optional=True, default=None),
Unicode("allowed_match", optional=True, default=None),
Unicode("subsuite", default=None, optional=True),
)
def lsan_leak(self, data):
self._log_data("lsan_leak", data)
@log_action(
Int("bytes"),
Int("allocations"),
Boolean("allowed", optional=True, default=False),
Unicode("subsuite", default=None, optional=True),
)
def lsan_summary(self, data):
self._log_data("lsan_summary", data)
@log_action(
Unicode("process"),
Int("bytes"),
Unicode("name"),
Unicode("scope", optional=True, default=None),
Boolean("allowed", optional=True, default=False),
Unicode("subsuite", default=None, optional=True),
)
def mozleak_object(self, data):
self._log_data("mozleak_object", data)
@log_action(
Unicode("process"),
Nullable(Int, "bytes"),
Int("threshold"),
List(Unicode, "objects"),
Unicode("scope", optional=True, default=None),
Boolean("induced_crash", optional=True, default=False),
Boolean("ignore_missing", optional=True, default=False),
Unicode("subsuite", default=None, optional=True),
)
def mozleak_total(self, data):
self._log_data("mozleak_total", data)
@log_action()
def shutdown(self, data):
"""Shutdown the logger.
This logs a 'shutdown' action after which any further attempts to use
the logger will raise a :exc:`LoggerShutdownError`.
This is also called implicitly from the destructor or
when exiting the context manager.
"""
self._log_data("shutdown", data)
self._state.has_shutdown = True
def __enter__(self):
return self
def __exit__(self, exc, val, tb):
self.shutdown()
def _log_func(level_name):
@log_action(Unicode("message"), Any("exc_info", default=False))
def log(self, data):
exc_info = data.pop("exc_info", None)
if exc_info:
if not isinstance(exc_info, tuple):
exc_info = sys.exc_info()
if exc_info != (None, None, None):
bt = traceback.format_exception(*exc_info)
data["stack"] = "\n".join(bt)
data["level"] = level_name
self._log_data("log", data)
log.__doc__ = (
"""Log a message with level %s
:param message: The string message to log
:param exc_info: Either a boolean indicating whether to include a traceback
derived from sys.exc_info() or a three-item tuple in the
same format as sys.exc_info() containing exception information
to log.
"""
% level_name
)
log.__name__ = str(level_name).lower()
return log
def _lint_func(level_name):
@log_action(
Unicode("path"),
Unicode("message", default=""),
Int("lineno", default=0),
Int("column", default=None, optional=True),
Unicode("hint", default=None, optional=True),
Unicode("source", default=None, optional=True),
Unicode("rule", default=None, optional=True),
Tuple((Int, Int), "lineoffset", default=None, optional=True),
Unicode("linter", default=None, optional=True),
)
def lint(self, data):
data["level"] = level_name
self._log_data("lint", data)
lint.__doc__ = """Log an error resulting from a failed lint check
:param linter: name of the linter that flagged this error
:param path: path to the file containing the error
:param message: text describing the error
:param lineno: line number that contains the error
:param column: column containing the error
:param hint: suggestion for fixing the error (optional)
:param source: source code context of the error (optional)
:param rule: name of the rule that was violated (optional)
:param lineoffset: denotes an error spans multiple lines, of the form
(<lineno offset>, <num lines>) (optional)
"""
lint.__name__ = str("lint_%s" % level_name)
return lint
# Create all the methods on StructuredLog for log/lint levels
for level_name in log_levels:
setattr(StructuredLogger, level_name.lower(), _log_func(level_name))
for level_name in lint_levels:
level_name = level_name.lower()
name = "lint_%s" % level_name
setattr(StructuredLogger, name, _lint_func(level_name))
class StructuredLogFileLike(object):
"""Wrapper for file-like objects to redirect writes to logger
instead. Each call to `write` becomes a single log entry of type `log`.
When using this it is important that the callees i.e. the logging
handlers do not themselves try to write to the wrapped file as this
will cause infinite recursion.
:param logger: `StructuredLogger` to which to redirect the file write operations.
:param level: log level to use for each write.
:param prefix: String prefix to prepend to each log entry.
"""
def __init__(self, logger, level="info", prefix=None):
self.logger = logger
self.log_func = getattr(self.logger, level)
self.prefix = prefix
def write(self, data):
if data.endswith("\n"):
data = data[:-1]
if data.endswith("\r"):
data = data[:-1]
if self.prefix is not None:
data = "%s: %s" % (self.prefix, data)
self.log_func(data)
def flush(self):
pass