#!/usr/bin/env python3 # SPDX-License-Identifier: GPL-2.0 import multiprocessing import socket from lib.py import ksft_run, ksft_exit, ksft_eq, ksft_ge, cmd, fd_read_timeout from lib.py import NetDrvEpEnv from lib.py import EthtoolFamily, NetdevFamily from lib.py import KsftSkipEx, KsftFailEx from lib.py import rand_port def traffic(cfg, local_port, remote_port, ipver): af_inet = socket.AF_INET if ipver == "4" else socket.AF_INET6 sock = socket.socket(af_inet, socket.SOCK_DGRAM) sock.bind(("", local_port)) sock.connect((cfg.remote_addr_v[ipver], remote_port)) tgt = f"{ipver}:[{cfg.addr_v[ipver]}]:{local_port},sourceport={remote_port}" cmd("echo a | socat - UDP" + tgt, host=cfg.remote) fd_read_timeout(sock.fileno(), 5) return sock.getsockopt(socket.SOL_SOCKET, socket.SO_INCOMING_CPU) def test_rss_input_xfrm(cfg, ipver): """ Test symmetric input_xfrm. If symmetric RSS hash is configured, send traffic twice, swapping the src/dst UDP ports, and verify that the same queue is receiving the traffic in both cases (IPs are constant). """ if multiprocessing.cpu_count() < 2: raise KsftSkipEx("Need at least two CPUs to test symmetric RSS hash") input_xfrm = cfg.ethnl.rss_get( {'header': {'dev-name': cfg.ifname}}).get('input_xfrm') # Check for symmetric xor/or-xor if not input_xfrm or (input_xfrm != 1 and input_xfrm != 2): raise KsftSkipEx("Symmetric RSS hash not requested") cpus = set() successful = 0 for _ in range(100): try: port1 = rand_port(socket.SOCK_DGRAM) port2 = rand_port(socket.SOCK_DGRAM) cpu1 = traffic(cfg, port1, port2, ipver) cpu2 = traffic(cfg, port2, port1, ipver) cpus.update([cpu1, cpu2]) ksft_eq( cpu1, cpu2, comment=f"Received traffic on different cpus with ports ({port1 = }, {port2 = }) while symmetric hash is configured") successful += 1 if successful == 10: break except: continue else: raise KsftFailEx("Failed to run traffic") ksft_ge(len(cpus), 2, comment=f"Received traffic on less than two cpus {cpus = }") def test_rss_input_xfrm_ipv4(cfg): cfg.require_ipver("4") test_rss_input_xfrm(cfg, "4") def test_rss_input_xfrm_ipv6(cfg): cfg.require_ipver("6") test_rss_input_xfrm(cfg, "6") def main() -> None: with NetDrvEpEnv(__file__, nsim_test=False) as cfg: cfg.ethnl = EthtoolFamily() cfg.netdevnl = NetdevFamily() ksft_run([test_rss_input_xfrm_ipv4, test_rss_input_xfrm_ipv6], args=(cfg, )) ksft_exit() if __name__ == "__main__": main()