chromium/third_party/wpt_tools/wpt/tools/wptrunner/wptrunner/instruments.py

# mypy: allow-untyped-defs

import time
import threading

from . import mpcontext

"""Instrumentation for measuring high-level time spent on various tasks inside the runner.

This is lower fidelity than an actual profile, but allows custom data to be considered,
so that we can see the time spent in specific tests and test directories.


Instruments are intended to be used as context managers with the return value of __enter__
containing the user-facing API e.g.

with Instrument(*args) as recording:
    recording.set(["init"])
    do_init()
    recording.pause()
    for thread in test_threads:
       thread.start(recording, *args)
    for thread in test_threads:
       thread.join()
    recording.set(["teardown"])   # un-pauses the Instrument
    do_teardown()
"""

class NullInstrument:
    def set(self, stack):
        """Set the current task to stack

        :param stack: A list of strings defining the current task.
                      These are interpreted like a stack trace so that ["foo"] and
                      ["foo", "bar"] both show up as descendants of "foo"
        """
        pass

    def pause(self):
        """Stop recording a task on the current thread. This is useful if the thread
        is purely waiting on the results of other threads"""
        pass

    def __enter__(self):
        return self

    def __exit__(self, *args, **kwargs):
        return


class InstrumentWriter:
    def __init__(self, queue):
        self.queue = queue

    def set(self, stack):
        stack.insert(0, threading.current_thread().name)
        stack = self._check_stack(stack)
        self.queue.put(("set", threading.current_thread().ident, time.time(), stack))

    def pause(self):
        self.queue.put(("pause", threading.current_thread().ident, time.time(), None))

    def _check_stack(self, stack):
        assert isinstance(stack, (tuple, list))
        return [item.replace(" ", "_") for item in stack]


class Instrument:
    def __init__(self, file_path):
        """Instrument that collects data from multiple threads and sums the time in each
        thread. The output is in the format required by flamegraph.pl to enable visualisation
        of the time spent in each task.

        :param file_path: - The path on which to write instrument output. Any existing file
                            at the path will be overwritten
        """
        self.path = file_path
        self.queue = None
        self.current = None
        self.start_time = None
        self.instrument_proc = None

    def __enter__(self):
        assert self.instrument_proc is None
        assert self.queue is None
        mp = mpcontext.get_context()
        self.queue = mp.Queue()
        self.instrument_proc = mp.Process(target=self.run)
        self.instrument_proc.start()
        return InstrumentWriter(self.queue)

    def __exit__(self, *args, **kwargs):
        self.queue.put(("stop", None, time.time(), None))
        self.instrument_proc.join()
        self.instrument_proc = None
        self.queue = None

    def run(self):
        known_commands = {"stop", "pause", "set"}
        with open(self.path, "w") as f:
            thread_data = {}
            while True:
                command, thread, time_stamp, stack = self.queue.get()
                assert command in known_commands

                # If we are done recording, dump the information from all threads to the file
                # before exiting. Otherwise for either 'set' or 'pause' we only need to dump
                # information from the current stack (if any) that was recording on the reporting
                # thread (as that stack is no longer active).
                items = []
                if command == "stop":
                    items = thread_data.values()
                elif thread in thread_data:
                    items.append(thread_data.pop(thread))
                for output_stack, start_time in items:
                    f.write("%s %d\n" % (";".join(output_stack), int(1000 * (time_stamp - start_time))))

                if command == "set":
                    thread_data[thread] = (stack, time_stamp)
                elif command == "stop":
                    break