cpython/Lib/test/libregrtest/win_utils.py

import _overlapped
import _thread
import _winapi
import math
import struct
import winreg


# Seconds per measurement
SAMPLING_INTERVAL = 1
# Exponential damping factor to compute exponentially weighted moving average
# on 1 minute (60 seconds)
LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60)
# Initialize the load using the arithmetic mean of the first NVALUE values
# of the Processor Queue Length
NVALUE = 5


class WindowsLoadTracker():
    """
    This class asynchronously reads the performance counters to calculate
    the system load on Windows.  A "raw" thread is used here to prevent
    interference with the test suite's cases for the threading module.
    """

    def __init__(self):
        # make __del__ not fail if pre-flight test fails
        self._running = None
        self._stopped = None

        # Pre-flight test for access to the performance data;
        # `PermissionError` will be raised if not allowed
        winreg.QueryInfoKey(winreg.HKEY_PERFORMANCE_DATA)

        self._values = []
        self._load = None
        self._running = _overlapped.CreateEvent(None, True, False, None)
        self._stopped = _overlapped.CreateEvent(None, True, False, None)

        _thread.start_new_thread(self._update_load, (), {})

    def _update_load(self,
                    # localize module access to prevent shutdown errors
                     _wait=_winapi.WaitForSingleObject,
                     _signal=_overlapped.SetEvent):
        # run until signaled to stop
        while _wait(self._running, 1000):
            self._calculate_load()
        # notify stopped
        _signal(self._stopped)

    def _calculate_load(self,
                        # localize module access to prevent shutdown errors
                        _query=winreg.QueryValueEx,
                        _hkey=winreg.HKEY_PERFORMANCE_DATA,
                        _unpack=struct.unpack_from):
        # get the 'System' object
        data, _ = _query(_hkey, '2')
        # PERF_DATA_BLOCK {
        #   WCHAR Signature[4]      8 +
        #   DWOWD LittleEndian      4 +
        #   DWORD Version           4 +
        #   DWORD Revision          4 +
        #   DWORD TotalByteLength   4 +
        #   DWORD HeaderLength      = 24 byte offset
        #   ...
        # }
        obj_start, = _unpack('L', data, 24)
        # PERF_OBJECT_TYPE {
        #   DWORD TotalByteLength
        #   DWORD DefinitionLength
        #   DWORD HeaderLength
        #   ...
        # }
        data_start, defn_start = _unpack('4xLL', data, obj_start)
        data_base = obj_start + data_start
        defn_base = obj_start + defn_start
        # find the 'Processor Queue Length' counter (index=44)
        while defn_base < data_base:
            # PERF_COUNTER_DEFINITION {
            #   DWORD ByteLength
            #   DWORD CounterNameTitleIndex
            #   ... [7 DWORDs/28 bytes]
            #   DWORD CounterOffset
            # }
            size, idx, offset = _unpack('LL28xL', data, defn_base)
            defn_base += size
            if idx == 44:
                counter_offset = data_base + offset
                # the counter is known to be PERF_COUNTER_RAWCOUNT (DWORD)
                processor_queue_length, = _unpack('L', data, counter_offset)
                break
        else:
            return

        # We use an exponentially weighted moving average, imitating the
        # load calculation on Unix systems.
        # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
        # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
        if self._load is not None:
            self._load = (self._load * LOAD_FACTOR_1
                            + processor_queue_length  * (1.0 - LOAD_FACTOR_1))
        elif len(self._values) < NVALUE:
            self._values.append(processor_queue_length)
        else:
            self._load = sum(self._values) / len(self._values)

    def close(self, kill=True):
        self.__del__()
        return

    def __del__(self,
                # localize module access to prevent shutdown errors
                _wait=_winapi.WaitForSingleObject,
                _close=_winapi.CloseHandle,
                _signal=_overlapped.SetEvent):
        if self._running is not None:
            # tell the update thread to quit
            _signal(self._running)
            # wait for the update thread to signal done
            _wait(self._stopped, -1)
            # cleanup events
            _close(self._running)
            _close(self._stopped)
            self._running = self._stopped = None

    def getloadavg(self):
        return self._load