folly/build/fbcode_builder/getdeps/platform.py

# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# pyre-unsafe

import os

import platform
import re
import shlex
import sys
from typing import Optional, Tuple


def is_windows() -> bool:
    """Returns true if the system we are currently running on
    is a Windows system"""
    return sys.platform.startswith("win")


def get_linux_type() -> Tuple[Optional[str], Optional[str], Optional[str]]:
    try:
        with open("/etc/os-release") as f:
            data = f.read()
    except EnvironmentError:
        return (None, None, None)

    os_vars = {}
    for line in data.splitlines():
        parts = line.split("=", 1)
        if len(parts) != 2:
            continue
        key = parts[0].strip()
        value_parts = shlex.split(parts[1].strip())
        if not value_parts:
            value = ""
        else:
            value = value_parts[0]
        os_vars[key] = value

    name = os_vars.get("NAME")
    if name:
        name = name.lower()
        name = re.sub("linux", "", name)
        name = name.strip().replace(" ", "_")

    version_id = os_vars.get("VERSION_ID")
    if version_id:
        version_id = version_id.lower()

    return "linux", name, version_id


# Ideally we'd use a common library like `psutil` to read system information,
# but getdeps can't take third-party dependencies.


def _get_available_ram_linux() -> int:
    # TODO: Ideally, this function would inspect the current cgroup for any
    # limits, rather than solely relying on system RAM.

    meminfo_path = "/proc/meminfo"
    try:
        with open(meminfo_path) as f:
            for line in f:
                try:
                    key, value = line.split(":", 1)
                except ValueError:
                    continue
                suffix = " kB\n"
                if key == "MemAvailable" and value.endswith(suffix):
                    value = value[: -len(suffix)]
                    try:
                        return int(value) // 1024
                    except ValueError:
                        continue
    except OSError:
        print("error opening {}".format(meminfo_path), end="", file=sys.stderr)
    else:
        print(
            "{} had no valid MemAvailable".format(meminfo_path), end="", file=sys.stderr
        )

    guess = 8
    print(", guessing {} GiB".format(guess), file=sys.stderr)
    return guess * 1024


def _get_available_ram_macos() -> int:
    import ctypes.util

    libc = ctypes.CDLL(ctypes.util.find_library("libc"), use_errno=True)
    sysctlbyname = libc.sysctlbyname
    sysctlbyname.restype = ctypes.c_int
    sysctlbyname.argtypes = [
        ctypes.c_char_p,
        ctypes.c_void_p,
        ctypes.POINTER(ctypes.c_size_t),
        ctypes.c_void_p,
        ctypes.c_size_t,
    ]
    # TODO: There may be some way to approximate an availability
    # metric, but just use total RAM for now.
    memsize = ctypes.c_int64()
    memsizesize = ctypes.c_size_t(8)
    res = sysctlbyname(
        b"hw.memsize", ctypes.byref(memsize), ctypes.byref(memsizesize), None, 0
    )
    if res != 0:
        raise NotImplementedError(
            f"failed to retrieve hw.memsize sysctl: {ctypes.get_errno()}"
        )
    return memsize.value // (1024 * 1024)


def _get_available_ram_windows() -> int:
    import ctypes

    DWORD = ctypes.c_uint32
    QWORD = ctypes.c_uint64

    class MEMORYSTATUSEX(ctypes.Structure):
        _fields_ = [
            ("dwLength", DWORD),
            ("dwMemoryLoad", DWORD),
            ("ullTotalPhys", QWORD),
            ("ullAvailPhys", QWORD),
            ("ullTotalPageFile", QWORD),
            ("ullAvailPageFile", QWORD),
            ("ullTotalVirtual", QWORD),
            ("ullAvailVirtual", QWORD),
            ("ullExtendedVirtual", QWORD),
        ]

    ms = MEMORYSTATUSEX()
    ms.dwLength = ctypes.sizeof(ms)
    # pyre-ignore[16]
    res = ctypes.windll.kernel32.GlobalMemoryStatusEx(ctypes.byref(ms))
    if res == 0:
        raise NotImplementedError("error calling GlobalMemoryStatusEx")

    # This is fuzzy, but AvailPhys is too conservative, and AvailTotal is too
    # aggressive, so average the two. It's okay for builds to use some swap.
    return (ms.ullAvailPhys + ms.ullTotalPhys) // (2 * 1024 * 1024)


def _get_available_ram_freebsd() -> int:
    import ctypes.util

    libc = ctypes.CDLL(ctypes.util.find_library("libc"), use_errno=True)
    sysctlbyname = libc.sysctlbyname
    sysctlbyname.restype = ctypes.c_int
    sysctlbyname.argtypes = [
        ctypes.c_char_p,
        ctypes.c_void_p,
        ctypes.POINTER(ctypes.c_size_t),
        ctypes.c_void_p,
        ctypes.c_size_t,
    ]
    # hw.usermem is pretty close to what we want.
    memsize = ctypes.c_int64()
    memsizesize = ctypes.c_size_t(8)
    res = sysctlbyname(
        b"hw.usermem", ctypes.byref(memsize), ctypes.byref(memsizesize), None, 0
    )
    if res != 0:
        raise NotImplementedError(
            f"failed to retrieve hw.memsize sysctl: {ctypes.get_errno()}"
        )
    return memsize.value // (1024 * 1024)


def get_available_ram() -> int:
    """
    Returns a platform-appropriate available RAM metric in MiB.
    """
    if sys.platform == "linux":
        return _get_available_ram_linux()
    elif sys.platform == "darwin":
        return _get_available_ram_macos()
    elif sys.platform == "win32":
        return _get_available_ram_windows()
    elif sys.platform.startswith("freebsd"):
        return _get_available_ram_freebsd()
    else:
        raise NotImplementedError(
            f"platform {sys.platform} does not have an implementation of get_available_ram"
        )


def is_current_host_arm() -> bool:
    if sys.platform.startswith("darwin"):
        # platform.machine() can be fooled by rosetta for python < 3.9.2
        return "ARM64" in os.uname().version
    else:
        machine = platform.machine().lower()
        return "arm" in machine or "aarch" in machine


class HostType(object):
    def __init__(self, ostype=None, distro=None, distrovers=None) -> None:
        # Maybe we should allow callers to indicate whether this machine uses
        # an ARM architecture, but we need to change HostType serialization
        # and deserialization in that case and hunt down anywhere that is
        # persisting that serialized data.
        isarm = False

        if ostype is None:
            distro = None
            distrovers = None
            if sys.platform.startswith("linux"):
                ostype, distro, distrovers = get_linux_type()
            elif sys.platform.startswith("darwin"):
                ostype = "darwin"
            elif is_windows():
                ostype = "windows"
                # pyre-fixme[16]: Module `sys` has no attribute `getwindowsversion`.
                distrovers = str(sys.getwindowsversion().major)
            elif sys.platform.startswith("freebsd"):
                ostype = "freebsd"
            else:
                ostype = sys.platform

            isarm = is_current_host_arm()

        # The operating system type
        self.ostype = ostype
        # The distribution, if applicable
        self.distro = distro
        # The OS/distro version if known
        self.distrovers = distrovers
        # Does the CPU use an ARM architecture? ARM includes Apple Silicon
        # Macs as well as other ARM systems that might be running Linux or
        # something.
        self.isarm = isarm

    def is_windows(self):
        return self.ostype == "windows"

    # is_arm is kinda half implemented at the moment. This method is only
    # intended to be used when HostType represents information about the
    # current machine we are running on.
    # When HostType is being used to enumerate platform types (represent
    # information about machine types that we may or may not be running on)
    # the result could be nonsense (under the current implementation its always
    # false.)
    def is_arm(self):
        return self.isarm

    def is_darwin(self):
        return self.ostype == "darwin"

    def is_linux(self):
        return self.ostype == "linux"

    def is_freebsd(self):
        return self.ostype == "freebsd"

    def as_tuple_string(self) -> str:
        return "%s-%s-%s" % (
            self.ostype,
            self.distro or "none",
            self.distrovers or "none",
        )

    def get_package_manager(self):
        if not self.is_linux() and not self.is_darwin():
            return None
        if self.is_darwin():
            return "homebrew"
        if self.distro in ("fedora", "centos", "centos_stream", "rocky"):
            return "rpm"
        if self.distro.startswith(("debian", "ubuntu", "pop!_os", "mint")):
            return "deb"
        if self.distro == "arch":
            return "pacman-package"
        return None

    @staticmethod
    def from_tuple_string(s) -> "HostType":
        ostype, distro, distrovers = s.split("-")
        return HostType(ostype=ostype, distro=distro, distrovers=distrovers)

    def __eq__(self, b):
        return (
            self.ostype == b.ostype
            and self.distro == b.distro
            and self.distrovers == b.distrovers
        )