linux/tools/testing/selftests/bpf/test_bpftool.py

# SPDX-License-Identifier: GPL-2.0
# Copyright (c) 2020 SUSE LLC.

import collections
import functools
import json
import os
import socket
import subprocess
import unittest


# Add the source tree of bpftool and /usr/local/sbin to PATH
cur_dir = os.path.dirname(os.path.realpath(__file__))
bpftool_dir = os.path.abspath(os.path.join(cur_dir, "..", "..", "..", "..",
                                           "tools", "bpf", "bpftool"))
os.environ["PATH"] = bpftool_dir + ":/usr/local/sbin:" + os.environ["PATH"]


class IfaceNotFoundError(Exception):
    pass


class UnprivilegedUserError(Exception):
    pass


def _bpftool(args, json=True):
    _args = ["bpftool"]
    if json:
        _args.append("-j")
    _args.extend(args)

    return subprocess.check_output(_args)


def bpftool(args):
    return _bpftool(args, json=False).decode("utf-8")


def bpftool_json(args):
    res = _bpftool(args)
    return json.loads(res)


def get_default_iface():
    for iface in socket.if_nameindex():
        if iface[1] != "lo":
            return iface[1]
    raise IfaceNotFoundError("Could not find any network interface to probe")


def default_iface(f):
    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        iface = get_default_iface()
        return f(*args, iface, **kwargs)
    return wrapper

DMESG_EMITTING_HELPERS = [
        "bpf_probe_write_user",
        "bpf_trace_printk",
        "bpf_trace_vprintk",
    ]

class TestBpftool(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        if os.getuid() != 0:
            raise UnprivilegedUserError(
                "This test suite needs root privileges")

    @default_iface
    def test_feature_dev_json(self, iface):
        unexpected_helpers = DMESG_EMITTING_HELPERS
        expected_keys = [
            "syscall_config",
            "program_types",
            "map_types",
            "helpers",
            "misc",
        ]

        res = bpftool_json(["feature", "probe", "dev", iface])
        # Check if the result has all expected keys.
        self.assertCountEqual(res.keys(), expected_keys)
        # Check if unexpected helpers are not included in helpers probes
        # result.
        for helpers in res["helpers"].values():
            for unexpected_helper in unexpected_helpers:
                self.assertNotIn(unexpected_helper, helpers)

    def test_feature_kernel(self):
        test_cases = [
            bpftool_json(["feature", "probe", "kernel"]),
            bpftool_json(["feature", "probe"]),
            bpftool_json(["feature"]),
        ]
        unexpected_helpers = DMESG_EMITTING_HELPERS
        expected_keys = [
            "syscall_config",
            "system_config",
            "program_types",
            "map_types",
            "helpers",
            "misc",
        ]

        for tc in test_cases:
            # Check if the result has all expected keys.
            self.assertCountEqual(tc.keys(), expected_keys)
            # Check if unexpected helpers are not included in helpers probes
            # result.
            for helpers in tc["helpers"].values():
                for unexpected_helper in unexpected_helpers:
                    self.assertNotIn(unexpected_helper, helpers)

    def test_feature_kernel_full(self):
        test_cases = [
            bpftool_json(["feature", "probe", "kernel", "full"]),
            bpftool_json(["feature", "probe", "full"]),
        ]
        expected_helpers = DMESG_EMITTING_HELPERS

        for tc in test_cases:
            # Check if expected helpers are included at least once in any
            # helpers list for any program type. Unfortunately we cannot assume
            # that they will be included in all program types or a specific
            # subset of programs. It depends on the kernel version and
            # configuration.
            found_helpers = False

            for helpers in tc["helpers"].values():
                if all(expected_helper in helpers
                       for expected_helper in expected_helpers):
                    found_helpers = True
                    break

            self.assertTrue(found_helpers)

    def test_feature_kernel_full_vs_not_full(self):
        full_res = bpftool_json(["feature", "probe", "full"])
        not_full_res = bpftool_json(["feature", "probe"])
        not_full_set = set()
        full_set = set()

        for helpers in full_res["helpers"].values():
            for helper in helpers:
                full_set.add(helper)

        for helpers in not_full_res["helpers"].values():
            for helper in helpers:
                not_full_set.add(helper)

        self.assertCountEqual(full_set - not_full_set,
                              set(DMESG_EMITTING_HELPERS))
        self.assertCountEqual(not_full_set - full_set, set())

    def test_feature_macros(self):
        expected_patterns = [
            r"/\*\*\* System call availability \*\*\*/",
            r"#define HAVE_BPF_SYSCALL",
            r"/\*\*\* eBPF program types \*\*\*/",
            r"#define HAVE.*PROG_TYPE",
            r"/\*\*\* eBPF map types \*\*\*/",
            r"#define HAVE.*MAP_TYPE",
            r"/\*\*\* eBPF helper functions \*\*\*/",
            r"#define HAVE.*HELPER",
            r"/\*\*\* eBPF misc features \*\*\*/",
        ]

        res = bpftool(["feature", "probe", "macros"])
        for pattern in expected_patterns:
            self.assertRegex(res, pattern)