linux/tools/testing/selftests/net/lib/py/nsim.py

# SPDX-License-Identifier: GPL-2.0

import json
import os
import random
import re
import time
from .utils import cmd, ip


class NetdevSim:
    """
    Class for netdevsim netdevice and its attributes.
    """

    def __init__(self, nsimdev, port_index, ifname, ns=None):
        # In case udev renamed the netdev to according to new schema,
        # check if the name matches the port_index.
        nsimnamere = re.compile(r"eni\d+np(\d+)")
        match = nsimnamere.match(ifname)
        if match and int(match.groups()[0]) != port_index + 1:
            raise Exception("netdevice name mismatches the expected one")

        self.ifname = ifname
        self.nsimdev = nsimdev
        self.port_index = port_index
        self.ns = ns
        self.dfs_dir = "%s/ports/%u/" % (nsimdev.dfs_dir, port_index)
        ret = ip("-j link show dev %s" % ifname, ns=ns)
        self.dev = json.loads(ret.stdout)[0]
        self.ifindex = self.dev["ifindex"]

    def dfs_write(self, path, val):
        self.nsimdev.dfs_write(f'ports/{self.port_index}/' + path, val)


class NetdevSimDev:
    """
    Class for netdevsim bus device and its attributes.
    """
    @staticmethod
    def ctrl_write(path, val):
        fullpath = os.path.join("/sys/bus/netdevsim/", path)
        with open(fullpath, "w") as f:
            f.write(val)

    def dfs_write(self, path, val):
        fullpath = os.path.join(f"/sys/kernel/debug/netdevsim/netdevsim{self.addr}/", path)
        with open(fullpath, "w") as f:
            f.write(val)

    def __init__(self, port_count=1, queue_count=1, ns=None):
        # nsim will spawn in init_net, we'll set to actual ns once we switch it there
        self.ns = None

        if not os.path.exists("/sys/bus/netdevsim"):
            cmd("modprobe netdevsim")

        addr = random.randrange(1 << 15)
        while True:
            try:
                self.ctrl_write("new_device", "%u %u %u" % (addr, port_count, queue_count))
            except OSError as e:
                if e.errno == errno.ENOSPC:
                    addr = random.randrange(1 << 15)
                    continue
                raise e
            break
        self.addr = addr

        # As probe of netdevsim device might happen from a workqueue,
        # so wait here until all netdevs appear.
        self.wait_for_netdevs(port_count)

        if ns:
            cmd(f"devlink dev reload netdevsim/netdevsim{addr} netns {ns.name}")
            self.ns = ns

        cmd("udevadm settle", ns=self.ns)
        ifnames = self.get_ifnames()

        self.dfs_dir = "/sys/kernel/debug/netdevsim/netdevsim%u/" % addr

        self.nsims = []
        for port_index in range(port_count):
            self.nsims.append(self._make_port(port_index, ifnames[port_index]))

        self.removed = False

    def __enter__(self):
        return self

    def __exit__(self, ex_type, ex_value, ex_tb):
        """
        __exit__ gets called at the end of a "with" block.
        """
        self.remove()

    def _make_port(self, port_index, ifname):
        return NetdevSim(self, port_index, ifname, self.ns)

    def get_ifnames(self):
        ifnames = []
        listdir = cmd(f"ls /sys/bus/netdevsim/devices/netdevsim{self.addr}/net/",
                      ns=self.ns).stdout.split()
        for ifname in listdir:
            ifnames.append(ifname)
        ifnames.sort()
        return ifnames

    def wait_for_netdevs(self, port_count):
        timeout = 5
        timeout_start = time.time()

        while True:
            try:
                ifnames = self.get_ifnames()
            except FileNotFoundError as e:
                ifnames = []
            if len(ifnames) == port_count:
                break
            if time.time() < timeout_start + timeout:
                continue
            raise Exception("netdevices did not appear within timeout")

    def remove(self):
        if not self.removed:
            self.ctrl_write("del_device", "%u" % (self.addr, ))
            self.removed = True

    def remove_nsim(self, nsim):
        self.nsims.remove(nsim)
        self.ctrl_write("devices/netdevsim%u/del_port" % (self.addr, ),
                        "%u" % (nsim.port_index, ))