From 2deff8b2e0ef0323384b4996cbaa406400d507ae Mon Sep 17 00:00:00 2001 From: Joseph Albert Nefario Date: Sat, 23 May 2026 12:14:30 +0300 Subject: [PATCH] tests: cross-driver regression matrix harness (devourer vs kernel) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds `tests/regress.py` — a manual-run Python orchestrator that compares this project's userspace stack against the kernel-driver baseline for both TX and RX on two plugged-in USB Wi-Fi adapters: TX = devourer TX = kernel RX = devourer end-to-end devourer does dvr RX a kernel-TX frame? RX = kernel does dvr emit valid baseline / rig sanity check frames? Each cell injects/receives the canonical beacon (SA 57:42:75:05:d6:00, matching txdemo/main.cpp) for --duration seconds and counts hits. A cell passes if hits >= --pass-threshold. Output is a markdown table — designed to paste into PR comments. Pieces: - `tests/regress.py` — matrix orchestrator. Auto-detects DUTs via sysfs, handles per-cell kernel-driver bind/unbind, parses devourer log output, supports --no-baseline-abort for partial-rig setups where one chipset has no working kernel driver. - `tests/inject_beacon.py` — standalone scapy injector for the kernel-TX cells. Emits the same beacon WiFiDriverTxDemo uses, so cross-driver SA matching works either direction. - `tests/README.md` — usage, prereqs, distro-agnostic install hints, VM-readiness notes (kernel-cell shell-outs all go through one function — drop in `ssh trainer-vm sudo` to migrate the kernel driver into a pinned-kernel VM when host upgrades start breaking the out-of-tree aircrack-ng driver). Portability: tool paths resolved via `which`, wlan iface names discovered via `iw dev` (works for systemd's `wlp*` and classic `wlan*`), kernel driver claiming each DUT read from sysfs (no hardcoded module names). Preflight check prints actionable install hints if anything's missing. First-run validation on trainer-arch (Arch, kernel 6.x, 0bda:8812 + 0bda:8813 in a USB hub): the devourer-TX(8814) → kernel-RX(8812) cell passed, proving devourer's RTL8814AU TX path (per #29) really does emit frames the mainline rtw88 picks up. The remaining cells correctly identified the rig's known limitations — mainline rtw88_8814au can't probe this 8814AU dongle on this kernel (firmware- download error -22), and 8814AU RX is a pre-existing TODO. README explains how to interpret a partial matrix in that case. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/README.md | 107 ++++++ tests/inject_beacon.py | 78 ++++ tests/regress.py | 782 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 967 insertions(+) create mode 100644 tests/README.md create mode 100755 tests/inject_beacon.py create mode 100755 tests/regress.py diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..cc45669 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,107 @@ +# devourer regression test rig + +Cross-driver matrix test that compares this project's userspace stack +against the kernel-driver baseline (aircrack-ng / mainline `rtw88`) for +both TX and RX on plugged-in USB Wi-Fi adapters. + +``` + TX = devourer TX = kernel +RX = devourer end-to-end devourer does dvr RX a kernel-TX frame? +RX = kernel does dvr emit valid baseline / rig sanity check + frames? +``` + +Each cell injects/receives the canonical beacon (SA `57:42:75:05:d6:00`, +matching `txdemo/main.cpp`) for `--duration` seconds and counts hits. +The baseline cell runs first — if it fails the rig itself is broken +(channel busy, antennas, kernel driver mismatch) and the remaining cells +are skipped. + +## Prerequisites + +- 2 supported USB Wi-Fi adapters plugged into the same host +- devourer built (`build/WiFiDriverDemo`, `build/WiFiDriverTxDemo`) +- Kernel driver(s) for the adapter(s) installed and `modprobe`-able + (rtl8812au/rtl8814au from aircrack-ng or your distro's `rtw88` for + mainline). The script doesn't care which — it queries sysfs for whatever + is bound. +- Python 3.9+ with `scapy` available (`pip install scapy` or your distro's + `python3-scapy`) +- `iw`, `tcpdump`, `ip` on PATH +- Passwordless `sudo`, or run the script directly as root +- NetworkManager users: stop NM for the duration of the test, or + `nmcli device set managed no` on the test interfaces before + running. (NM will fight you for the monitor-mode wlan iface otherwise.) + +The script does a preflight check and prints distro-agnostic install +hints for anything missing. + +## Usage + +```bash +sudo python3 tests/regress.py --channel 100 +``` + +Auto-detects the first two supported adapters via sysfs. To pick +specific ones: + +```bash +sudo python3 tests/regress.py \ + --tx-pid 0x8812 --rx-pid 0x8813 --channel 100 --duration 20 +``` + +Output is a markdown table printed to stdout — paste into PR comments +or save with `tee`: + +``` +## Regression matrix — channel 100, 2026-05-23 12:34:56 + +- TX adapter: `0bda:8812` (RTL8812AU) +- RX adapter: `0bda:8813` (RTL8814AU) +- Cell duration: 15s +- Pass threshold: ≥ 5 hits + +| | TX = devourer | TX = kernel | +|---|---|---| +| RX = devourer | 42 hits / 7500 TX / 15s ✓ | 35 hits / 7500 TX / 15s ✓ | +| RX = kernel | 31 hits / 7500 TX / 15s ✓ | 47 hits / 7500 TX / 15s ✓ | +``` + +For debugging a specific cell that failed, re-run with `--keep-logs` — +per-cell stdout/stderr logs are symlinked at +`/tmp/devourer-regress-last/`. + +## Supported adapters + +Listed in `SUPPORTED_DUTS` at the top of `regress.py`. Extend the dict +to add new chipsets — the rest of the script is chipset-agnostic. + +## Channel selection + +The default `--channel 36` is a 5GHz channel that's typically quiet, +which means hit counts will be low but stable. For high-confidence +runs, pick a channel where your nearest AP is actively transmitting +(check via `iw dev wlan0 scan | grep -E "freq|SSID"` on a separate +device). + +## VM-readiness + +The kernel-cell shell-outs go through `run_kernel_cmd()` in `regress.py`. +Today it's `subprocess.run` (local). To migrate the kernel side into a +pinned-kernel VM — recommended once host-kernel upgrades start breaking +the out-of-tree aircrack-ng driver — replace `run_kernel_cmd` with an +`ssh user@trainer-vm sudo` wrapper and arrange USB hot-plug passthrough +into the VM via libvirt (`virsh attach-device` with a `` USB +spec). The matrix orchestrator doesn't need to change. + +## Known gaps + +- Tests "signal of life", not throughput. Hit counts vary 5-20× run-over- + run depending on ambient RF — thresholds are deliberately generous. +- Per-cell startup time is ~10s (devourer fwdl + warmup). 4 cells × ~25s + ≈ 100s per matrix run. Fine for manual runs, would be annoying for CI. +- No support yet for >2 adapters. To extend, add a pairing loop in + `main()` that runs the 4-cell matrix per chipset pair. +- Kernel TX side uses scapy at 500 fps. If your kernel driver's + injection rate is the bottleneck on a given chip, lower + `--interval` in `inject_beacon.py`. diff --git a/tests/inject_beacon.py b/tests/inject_beacon.py new file mode 100755 index 0000000..8232f62 --- /dev/null +++ b/tests/inject_beacon.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 +"""Inject the canonical devourer TX-validation beacon on a kernel-driver +monitor interface. + +The frame mirrors txdemo/main.cpp's hardcoded beacon: a probe request with +SA = 57:42:75:05:d6:00. WiFiDriverDemo and WiFiDriverTxDemo both grep for +this SA on RX (the `` matcher) so the same beacon works +as the TX source whether the RX side is devourer or tcpdump. + +Run from tests/regress.py's kernel-TX cell: + + sudo python3 tests/inject_beacon.py --iface wlpXX --count 500 --interval 0.002 + +Requires the iface to already be in monitor mode on the chosen channel +(regress.py sets that up). Idempotent: run multiple times safely. +""" + +import argparse +import time + +from scapy.all import RadioTap, Dot11, sendp + +# Source MAC matches the canonical beacon SA in txdemo/main.cpp and the +# `` matcher in demo/main.cpp. Don't change without +# updating both sides. +CANONICAL_SA = "57:42:75:05:d6:00" + + +def build_beacon(): + """Mgmt / probe-request frame matching txdemo's beacon_frame[]. The body + payload doesn't matter for hit-count testing — only SA is matched.""" + return ( + RadioTap() + / Dot11( + type=0, # mgmt + subtype=4, # probe request + addr1="ff:ff:ff:ff:ff:ff", # DA broadcast + addr2=CANONICAL_SA, # SA — matched by RX side + addr3=CANONICAL_SA, # BSSID + ) + / b"\x00\x00\x00\x00\x00\x00\x00\x00" # ssid IE (empty) + ) + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--iface", required=True, help="monitor-mode wlan iface") + ap.add_argument( + "--duration", + type=float, + default=30.0, + help="seconds to inject (default 30)", + ) + ap.add_argument( + "--interval", + type=float, + default=0.002, + help="inter-frame gap seconds (default 0.002 = 500 fps, matches txdemo)", + ) + args = ap.parse_args() + + pkt = build_beacon() + end = time.monotonic() + args.duration + sent = 0 + while time.monotonic() < end: + try: + sendp(pkt, iface=args.iface, verbose=False) + sent += 1 + except OSError as e: + # iface went down mid-test — bail rather than spin. + print(f"inject_beacon: sendp failed after {sent} frames: {e}") + break + time.sleep(args.interval) + print(f"inject_beacon: sent {sent} frames on {args.iface}") + + +if __name__ == "__main__": + main() diff --git a/tests/regress.py b/tests/regress.py new file mode 100755 index 0000000..46c56ac --- /dev/null +++ b/tests/regress.py @@ -0,0 +1,782 @@ +#!/usr/bin/env python3 +"""Cross-driver regression matrix for devourer. + +Runs a 4-cell test on a host with two compatible USB Wi-Fi adapters, comparing +this project's userspace stack ("devourer") against the kernel driver +(aircrack-ng / mainline rtw88) for both TX and RX: + + TX = devourer TX = kernel + RX = devourer [end-to-end dvr] [does dvr RX kernel-TX frame?] + RX = kernel [does dvr emit [baseline / rig sanity] + valid frames?] + +Each cell injects/receives a known-SA beacon (57:42:75:05:d6:00, matching +the canonical frame in txdemo/main.cpp and the RX matcher in demo/main.cpp). +A cell passes if the RX side observes >= --pass-threshold hits within the +test duration. The baseline cell is run first; if it fails, the rig itself +is broken (interference, channel, antennas) and the matrix is aborted. + +Designed to be run manually after building devourer: + + cd /path/to/devourer && cmake --build build -j + sudo python3 tests/regress.py --channel 100 + +Supports any modern Linux distro: tool paths are resolved via `which`, wlan +interfaces are discovered via `iw dev`, and the kernel driver claiming each +DUT is read from sysfs (no hardcoded module names). NetworkManager users: +either stop NM for the duration, or `nmcli device set managed no` +on the test interfaces before running. + +VM-readiness: the kernel-cell shell-out goes through `run_kernel_cmd()`, +which today executes locally. To migrate the kernel-driver side into a +pinned-kernel VM (recommended once host kernel upgrades start breaking the +aircrack-ng/rtl8812au driver), point KERNEL_CELL_RUNNER at an `ssh` wrapper +and arrange USB passthrough to the VM via libvirt's USB hot-plug. +""" + +from __future__ import annotations + +import argparse +import dataclasses +import glob +import os +import shutil +import signal +import subprocess +import sys +import tempfile +import time +from pathlib import Path +from typing import Optional + +# Source MAC of the canonical beacon — must match txdemo/main.cpp and the +# `` matcher in demo/main.cpp. tcpdump filter and scapy +# injector both use this. +CANONICAL_SA = "57:42:75:05:d6:00" + +# Map every supported PID to the chipset family used for log readability. +# Detection of the kernel driver claiming the device is dynamic (via sysfs); +# this table is informational only. +SUPPORTED_DUTS = { + "0bda:8812": "RTL8812AU", + "0bda:8813": "RTL8814AU", + "2357:0120": "RTL8821AU (TP-Link T2U Plus)", + "0bda:0811": "RTL8811AU", + "0bda:a811": "RTL8811AU", + "0bda:b811": "RTL8811AU", +} + +# Required external tools. Each entry: (binary, distro-agnostic install hint). +REQUIRED_TOOLS = [ + ("iw", "your distro's `iw` package"), + ("tcpdump", "your distro's `tcpdump` package"), + ("ip", "iproute2 (almost always preinstalled)"), + ("modprobe", "kmod (almost always preinstalled)"), +] +# Python module deps (checked separately; scapy is the heavy one). +REQUIRED_PY_MODS = [ + ("scapy", "pip install scapy # or your distro's `python3-scapy`"), +] + + +# --------------------------------------------------------------------------- +# Subprocess helpers — wrap shell-outs with structured output. +# --------------------------------------------------------------------------- + + +def run(cmd: list[str], **kw) -> subprocess.CompletedProcess: + """Run a command synchronously, capturing output. Raises on non-zero + exit unless check=False is passed in **kw.""" + return subprocess.run(cmd, capture_output=True, text=True, **kw) + + +def run_kernel_cmd(cmd: list[str], **kw) -> subprocess.CompletedProcess: + """Kernel-cell shell-out. Today: same as `run` (local exec). When + migrating the kernel-driver side into a VM, wrap this with ssh / virsh + exec — every kernel-side call goes through here.""" + return run(cmd, **kw) + + +# --------------------------------------------------------------------------- +# Preflight — check prerequisites with friendly errors. +# --------------------------------------------------------------------------- + + +def preflight(devourer_root: Path) -> None: + """Bail early if anything required is missing. Better to fail with one + actionable message than to crash 4 cells in with a cryptic Python + traceback.""" + missing = [] + for tool, hint in REQUIRED_TOOLS: + if shutil.which(tool) is None: + missing.append(f" - command `{tool}` not on PATH (install: {hint})") + for mod, hint in REQUIRED_PY_MODS: + try: + __import__(mod) + except ImportError: + missing.append(f" - python module `{mod}` not importable (install: {hint})") + for binary in ("WiFiDriverDemo", "WiFiDriverTxDemo"): + if not (devourer_root / "build" / binary).is_file(): + missing.append( + f" - devourer binary `build/{binary}` missing — run " + f"`cmake -S {devourer_root} -B {devourer_root}/build && " + f"cmake --build {devourer_root}/build -j`" + ) + if os.geteuid() != 0: + missing.append( + " - this script needs root (modprobe / iw / tcpdump / sysfs writes). " + "Re-run with `sudo`." + ) + if missing: + sys.stderr.write("Prerequisites not met:\n" + "\n".join(missing) + "\n") + sys.exit(2) + + +# --------------------------------------------------------------------------- +# DUT discovery — find plugged-in adapters via sysfs. +# --------------------------------------------------------------------------- + + +@dataclasses.dataclass +class Dut: + sysfs_id: str # e.g. "1-14" (the USB device path component) + vid: str # e.g. "0bda" + pid: str # e.g. "8812" + chipset: str # human-readable + + @property + def vidpid(self) -> str: + return f"{self.vid}:{self.pid}" + + @property + def iface_id(self) -> str: + """Interface address sysfs expects for bind/unbind.""" + return f"{self.sysfs_id}:1.0" + + +def discover_duts() -> list[Dut]: + duts: list[Dut] = [] + for d in glob.glob("/sys/bus/usb/devices/*"): + try: + with open(f"{d}/idVendor") as f: + vid = f.read().strip() + with open(f"{d}/idProduct") as f: + pid = f.read().strip() + except (FileNotFoundError, IsADirectoryError, PermissionError): + continue + key = f"{vid}:{pid}" + if key in SUPPORTED_DUTS: + duts.append( + Dut( + sysfs_id=os.path.basename(d), + vid=vid, + pid=pid, + chipset=SUPPORTED_DUTS[key], + ) + ) + return duts + + +def kernel_driver_for_dut(dut: Dut) -> Optional[str]: + """Return the kernel driver name currently bound to dut, or None if + nothing is bound.""" + link = f"/sys/bus/usb/devices/{dut.iface_id}/driver" + if not os.path.islink(link): + return None + return os.path.basename(os.readlink(link)) + + +def wlan_iface_for_dut(dut: Dut) -> Optional[str]: + """If the kernel driver bound to dut has surfaced a net interface, return + its name (e.g. 'wlp0s20f0u14' or 'wlan0'). Otherwise None.""" + net_dir = f"/sys/bus/usb/devices/{dut.iface_id}/net" + if not os.path.isdir(net_dir): + return None + ifaces = os.listdir(net_dir) + return ifaces[0] if ifaces else None + + +# --------------------------------------------------------------------------- +# Driver-state orchestration — bind / unbind a DUT to its kernel driver. +# --------------------------------------------------------------------------- + + +def detach_from_kernel(dut: Dut) -> None: + """Unbind dut from whatever kernel driver claims it, so devourer can + claim_interface(). If nothing claims it, no-op.""" + drv = kernel_driver_for_dut(dut) + if drv is None: + return + try: + with open(f"/sys/bus/usb/drivers/{drv}/unbind", "w") as f: + f.write(dut.iface_id) + except OSError as e: + sys.stderr.write(f"detach_from_kernel({dut.iface_id}): {e}\n") + + +def attach_to_kernel(dut: Dut) -> None: + """Re-bind dut to its kernel driver. Caller is responsible for ensuring + the right module is modprobe'd first.""" + # The kernel picks the right driver automatically based on the device's + # modalias when we write to .../drivers_probe. + try: + with open("/sys/bus/usb/drivers_probe", "w") as f: + f.write(dut.iface_id) + except OSError as e: + sys.stderr.write(f"attach_to_kernel({dut.iface_id}): {e}\n") + + +def wait_for_wlan_iface(dut: Dut, timeout: float = 15.0) -> str: + """Block until the kernel driver surfaces a wlan iface for dut. + + 15s default accommodates the typical fwdl + association timeline for + rtw88 / rtl8812au drivers; the iface usually appears in 2-4s but the + out-of-tree aircrack-ng driver can take up to 10s on first probe.""" + end = time.monotonic() + timeout + while time.monotonic() < end: + iface = wlan_iface_for_dut(dut) + if iface is not None: + return iface + time.sleep(0.25) + raise RuntimeError( + f"no wlan iface appeared for {dut.vidpid} after {timeout}s — kernel " + f"driver may have failed to bind (check `dmesg | tail -20` for " + f"firmware-download or probe errors)" + ) + + +def kernel_iface_to_monitor(iface: str, channel: int) -> None: + """Put a kernel wlan iface into monitor mode on the given channel. + Idempotent — safe to call repeatedly. Surfaces command stderr on failure + so 'permission denied' / 'No such device' / 'Operation not supported' + error paths are actionable.""" + def _run(cmd): + r = run_kernel_cmd(cmd) + if r.returncode != 0: + raise RuntimeError( + f"{' '.join(cmd)} exit={r.returncode}: " + f"{(r.stderr or r.stdout).strip() or '(no stderr)'}" + ) + _run(["ip", "link", "set", iface, "down"]) + _run(["iw", "dev", iface, "set", "type", "monitor"]) + _run(["ip", "link", "set", iface, "up"]) + # set channel requires the iface to be up. + _run(["iw", "dev", iface, "set", "channel", str(channel)]) + + +# --------------------------------------------------------------------------- +# Cell implementations — each returns (hits, total_attempts). +# --------------------------------------------------------------------------- + + +@dataclasses.dataclass +class CellResult: + hits: int # frames matching CANONICAL_SA observed by the RX side + tx_attempts: int # frames the TX side reports having submitted + tx_failures: int # devourer-only: # of "Failed to send packet" errors + duration_s: float + notes: str = "" + + def passed(self, threshold: int) -> bool: + return self.hits >= threshold + + def fmt(self, threshold: int) -> str: + mark = "✓" if self.passed(threshold) else "✗" + attempts_str = f"{self.tx_attempts} TX" + if self.tx_failures > 0: + attempts_str += f" ({self.tx_failures} fail)" + return f"{self.hits} hits / {attempts_str} / {self.duration_s:.0f}s {mark}" + + +def _spawn_devourer_rx( + devourer_root: Path, dut: Dut, channel: int, log_path: Path +) -> subprocess.Popen: + env = os.environ.copy() + env["DEVOURER_VID"] = f"0x{dut.vid}" + env["DEVOURER_PID"] = f"0x{dut.pid}" + env["DEVOURER_CHANNEL"] = str(channel) + env["DEVOURER_USB_QUIET"] = "1" + fh = open(log_path, "w") + return subprocess.Popen( + [str(devourer_root / "build" / "WiFiDriverDemo")], + env=env, + stdout=fh, + stderr=subprocess.STDOUT, + start_new_session=True, + ) + + +def _spawn_devourer_tx( + devourer_root: Path, dut: Dut, channel: int, log_path: Path +) -> subprocess.Popen: + env = os.environ.copy() + env["DEVOURER_VID"] = f"0x{dut.vid}" + env["DEVOURER_PID"] = f"0x{dut.pid}" + env["DEVOURER_CHANNEL"] = str(channel) + env["DEVOURER_USB_QUIET"] = "1" + fh = open(log_path, "w") + return subprocess.Popen( + [str(devourer_root / "build" / "WiFiDriverTxDemo")], + env=env, + stdout=fh, + stderr=subprocess.STDOUT, + start_new_session=True, + ) + + +def _spawn_kernel_rx( + iface: str, channel: int, log_path: Path +) -> subprocess.Popen: + kernel_iface_to_monitor(iface, channel) + # tcpdump -e shows the link-layer header (so we see SA); filter by SA. + fh = open(log_path, "w") + return subprocess.Popen( + [ + "tcpdump", + "-i", + iface, + "-e", + "-nn", + "-l", + "ether", + "src", + CANONICAL_SA, + ], + stdout=fh, + stderr=subprocess.DEVNULL, + start_new_session=True, + ) + + +def _spawn_kernel_tx( + devourer_root: Path, iface: str, channel: int, duration: float, log_path: Path +) -> subprocess.Popen: + kernel_iface_to_monitor(iface, channel) + injector = devourer_root / "tests" / "inject_beacon.py" + fh = open(log_path, "w") + return subprocess.Popen( + [ + sys.executable, + str(injector), + "--iface", + iface, + "--duration", + str(duration), + ], + stdout=fh, + stderr=subprocess.STDOUT, + start_new_session=True, + ) + + +def _terminate(proc: subprocess.Popen, grace: float = 2.0) -> None: + if proc.poll() is not None: + return + try: + os.killpg(os.getpgid(proc.pid), signal.SIGINT) + proc.wait(timeout=grace) + except (ProcessLookupError, subprocess.TimeoutExpired): + try: + os.killpg(os.getpgid(proc.pid), signal.SIGKILL) + except ProcessLookupError: + pass + proc.wait() + + +def _count_devourer_rx_hits(log_path: Path) -> int: + """devourer RX hits show up as `...hits=N` lines, where + N is monotonically increasing per match. Take the max N seen.""" + last = 0 + try: + for line in log_path.read_text().splitlines(): + if "" in line: + # line format: "txdemo SA match: hits=N total_rx=M len=L" + for tok in line.split(): + if tok.startswith("hits="): + try: + last = max(last, int(tok.split("=", 1)[1])) + except ValueError: + pass + except FileNotFoundError: + pass + return last + + +def _count_devourer_tx_attempts(log_path: Path) -> tuple[int, int]: + """Returns (max_tx_count, send_failures). + + WiFiDriverTxDemo rate-limits its `TX #N rc=X` prints + (first 10 + every 500th N). The max N seen estimates total attempts — + BUT when send_packet fails, the inner loop runs slower than expected + (libusb error path takes longer than success path) and N may never + reach the next 500 boundary, so the max N from prints undercounts + badly. Surface the failure count alongside so the user can see what's + going on. + """ + last = 0 + failures = 0 + try: + for line in log_path.read_text().splitlines(): + if line.startswith("TX #"): + tok = line.split("#", 1)[1].split()[0] + try: + last = max(last, int(tok)) + except ValueError: + pass + elif "Failed to send packet" in line: + failures += 1 + except FileNotFoundError: + pass + return last, failures + + +def _count_tcpdump_hits(log_path: Path) -> int: + """tcpdump -e emits one line per packet. The filter narrows to SA matches, + so line count == hit count.""" + try: + return sum(1 for _ in log_path.read_text().splitlines()) + except FileNotFoundError: + return 0 + + +def _count_kernel_tx_sent(log_path: Path) -> int: + """inject_beacon.py emits `inject_beacon: sent N frames on IFACE` at exit.""" + try: + for line in log_path.read_text().splitlines(): + if line.startswith("inject_beacon: sent "): + return int(line.split()[2]) + except (FileNotFoundError, ValueError): + pass + return 0 + + +def run_cell( + devourer_root: Path, + tx_dut: Dut, + rx_dut: Dut, + tx_side: str, # "devourer" | "kernel" + rx_side: str, # "devourer" | "kernel" + channel: int, + duration: float, + tmpdir: Path, +) -> CellResult: + """Run one matrix cell end-to-end. + + State contract: this function is responsible for the full bind/unbind + dance of its DUTs. On entry, it first re-attaches both DUTs to the + kernel (idempotent — works whether the previous cell left them bound + or not), so a crashed previous cell doesn't poison the next one's + starting state. On exit (always — try/finally), it re-attaches anything + it detached. + """ + cell_id = f"tx-{tx_side}_rx-{rx_side}" + tx_log = tmpdir / f"{cell_id}.tx.log" + rx_log = tmpdir / f"{cell_id}.rx.log" + + # Restore baseline: re-attach both DUTs to whatever kernel driver they + # match. Idempotent. The wait after attach gives the kernel time to + # finish its probe path before we either use it or detach it again. + attach_to_kernel(tx_dut) + attach_to_kernel(rx_dut) + time.sleep(1.5) + + # Then detach the ones that this cell needs devourer to own. + if tx_side == "devourer": + detach_from_kernel(tx_dut) + if rx_side == "devourer": + detach_from_kernel(rx_dut) + + rx_proc: Optional[subprocess.Popen] = None + tx_proc: Optional[subprocess.Popen] = None + try: + # RX side starts first so it's listening when TX begins. + if rx_side == "devourer": + rx_proc = _spawn_devourer_rx(devourer_root, rx_dut, channel, rx_log) + # devourer needs ~5s for fwdl + channel set before it's actually RXing. + time.sleep(6.0) + else: + rx_iface = wait_for_wlan_iface(rx_dut) + rx_proc = _spawn_kernel_rx(rx_iface, channel, rx_log) + time.sleep(1.0) + + # TX side. + if tx_side == "devourer": + tx_proc = _spawn_devourer_tx(devourer_root, tx_dut, channel, tx_log) + # txdemo also has a 5s warmup (sleep(5) before the TX loop). + tx_warmup = 6.0 + else: + tx_iface = wait_for_wlan_iface(tx_dut) + tx_proc = _spawn_kernel_tx( + devourer_root, tx_iface, channel, duration, tx_log + ) + tx_warmup = 0.5 + + time.sleep(tx_warmup) + measure_start = time.monotonic() + time.sleep(duration) + measure_end = time.monotonic() + + # Parse counts (do this BEFORE we terminate processes so they get a + # chance to flush their stdout buffers — terminate sends SIGINT which + # devourer traps and exits cleanly, flushing in the process). + _terminate(tx_proc) + # Let any in-flight frames drain before we close the RX side. + time.sleep(1.0) + _terminate(rx_proc) + + if rx_side == "devourer": + hits = _count_devourer_rx_hits(rx_log) + else: + hits = _count_tcpdump_hits(rx_log) + if tx_side == "devourer": + tx_attempts, tx_failures = _count_devourer_tx_attempts(tx_log) + else: + tx_attempts = _count_kernel_tx_sent(tx_log) + tx_failures = 0 + + return CellResult( + hits=hits, + tx_attempts=tx_attempts, + tx_failures=tx_failures, + duration_s=measure_end - measure_start, + ) + finally: + # Always clean up subprocesses + restore kernel binding, even if the + # cell raised. This prevents one cell's failure from poisoning the + # next cell's starting state. + if tx_proc is not None and tx_proc.poll() is None: + _terminate(tx_proc) + if rx_proc is not None and rx_proc.poll() is None: + _terminate(rx_proc) + if tx_side == "devourer": + attach_to_kernel(tx_dut) + if rx_side == "devourer": + attach_to_kernel(rx_dut) + + +# --------------------------------------------------------------------------- +# Matrix driver — runs the 4 cells in baseline-first order. +# --------------------------------------------------------------------------- + + +def run_matrix( + devourer_root: Path, + tx_dut: Dut, + rx_dut: Dut, + channel: int, + duration: float, + threshold: int, + tmpdir: Path, + abort_on_baseline_fail: bool = True, +) -> dict[tuple[str, str], CellResult]: + cells = [ + # Baseline first — if the rig itself is broken, the other cells' + # results are uninterpretable. Default is to abort the matrix when + # this fails; override with --no-baseline-abort for partial-rig + # diagnostics (e.g. one chipset has no working kernel driver but + # devourer-only cells are still worth running). + ("kernel", "kernel"), + ("devourer", "kernel"), + ("kernel", "devourer"), + ("devourer", "devourer"), + ] + results: dict[tuple[str, str], CellResult] = {} + for tx_side, rx_side in cells: + cell_label = f"TX={tx_side:>8s} RX={rx_side:>8s}" + print(f"[{time.strftime('%H:%M:%S')}] running cell {cell_label} ...", + flush=True) + try: + r = run_cell( + devourer_root, + tx_dut, + rx_dut, + tx_side, + rx_side, + channel, + duration, + tmpdir, + ) + except Exception as e: + print(f" ✗ cell crashed: {e}", flush=True) + r = CellResult(hits=0, tx_attempts=0, tx_failures=0, + duration_s=0.0, notes=str(e)) + results[(tx_side, rx_side)] = r + print(f" → {r.fmt(threshold)}", flush=True) + if ( + (tx_side, rx_side) == ("kernel", "kernel") + and not r.passed(threshold) + and abort_on_baseline_fail + ): + print( + "BASELINE cell failed — the rig itself isn't moving frames. " + "Aborting remaining cells (channel busy? antennas? " + "wrong kernel driver?). Re-run with --no-baseline-abort " + "to attempt the remaining cells anyway.", + file=sys.stderr, + flush=True, + ) + for remaining in cells[1:]: + results[remaining] = CellResult( + hits=0, + tx_attempts=0, + tx_failures=0, + duration_s=0.0, + notes="skipped (baseline failed)", + ) + break + return results + + +def emit_markdown( + tx_dut: Dut, + rx_dut: Dut, + channel: int, + duration: float, + threshold: int, + results: dict[tuple[str, str], CellResult], +) -> str: + out = [] + out.append(f"## Regression matrix — channel {channel}, " + f"{time.strftime('%Y-%m-%d %H:%M:%S')}\n") + out.append(f"- TX adapter: `{tx_dut.vidpid}` ({tx_dut.chipset})") + out.append(f"- RX adapter: `{rx_dut.vidpid}` ({rx_dut.chipset})") + out.append(f"- Cell duration: {duration:.0f}s") + out.append(f"- Pass threshold: ≥ {threshold} hits\n") + out.append("| | TX = devourer | TX = kernel |") + out.append("|---|---|---|") + for rx_side in ("devourer", "kernel"): + cells = [] + for tx_side in ("devourer", "kernel"): + r = results.get((tx_side, rx_side)) + if r is None: + cells.append("—") + else: + cells.append(r.fmt(threshold)) + out.append(f"| RX = {rx_side} | {cells[0]} | {cells[1]} |") + return "\n".join(out) + "\n" + + +# --------------------------------------------------------------------------- +# CLI. +# --------------------------------------------------------------------------- + + +def main(): + ap = argparse.ArgumentParser( + description="Cross-driver regression matrix for devourer.", + ) + ap.add_argument( + "--devourer-root", + type=Path, + default=Path(__file__).resolve().parent.parent, + help="repo root with build/WiFiDriverDemo + build/WiFiDriverTxDemo " + "(default: parent of this script)", + ) + ap.add_argument( + "--channel", + type=int, + default=36, + help="Wi-Fi channel to test on (default 36 = 5GHz quiet, pick a busy " + "5GHz channel like 100 if your AP is there)", + ) + ap.add_argument( + "--duration", + type=float, + default=15.0, + help="seconds each cell injects/receives (default 15)", + ) + ap.add_argument( + "--pass-threshold", + type=int, + default=1, + help="min hits for a cell to pass (default 1 — generous because air " + "interference + short windows make absolute counts unreliable; " + "bump to 5-10 for higher-confidence runs on a quiet channel)", + ) + ap.add_argument( + "--tx-pid", + help="USB PID hex of TX adapter (default: first auto-detected DUT)", + ) + ap.add_argument( + "--rx-pid", + help="USB PID hex of RX adapter (default: second auto-detected DUT)", + ) + ap.add_argument( + "--keep-logs", + action="store_true", + help="don't delete the per-cell log files after the run", + ) + ap.add_argument( + "--no-baseline-abort", + action="store_true", + help="run all 4 cells even if kernel-kernel baseline fails (useful " + "when one chipset has no working kernel driver but devourer-only " + "cells are still worth checking)", + ) + args = ap.parse_args() + + preflight(args.devourer_root) + + duts = discover_duts() + if len(duts) < 2: + sys.stderr.write( + f"Need at least 2 supported DUTs plugged in, found {len(duts)}:\n" + ) + for d in duts: + sys.stderr.write(f" - {d.vidpid} ({d.chipset}) at {d.sysfs_id}\n") + sys.stderr.write( + "Plug another compatible adapter or extend SUPPORTED_DUTS table.\n" + ) + sys.exit(2) + + def pick(pid_arg, default_idx): + if pid_arg is None: + return duts[default_idx] + for d in duts: + if d.pid == pid_arg.lower().removeprefix("0x"): + return d + sys.stderr.write(f"No plugged DUT has PID {pid_arg}\n") + sys.exit(2) + + tx_dut = pick(args.tx_pid, 0) + rx_dut = pick(args.rx_pid, 1) + if tx_dut.sysfs_id == rx_dut.sysfs_id: + sys.stderr.write("TX and RX must be different physical devices.\n") + sys.exit(2) + + print(f"TX: {tx_dut.vidpid} ({tx_dut.chipset}) at {tx_dut.sysfs_id}") + print(f"RX: {rx_dut.vidpid} ({rx_dut.chipset}) at {rx_dut.sysfs_id}") + print(f"Channel: {args.channel} Duration/cell: {args.duration}s " + f"Pass threshold: ≥{args.pass_threshold} hits\n") + + with tempfile.TemporaryDirectory(prefix="devourer-regress-") as td: + tmpdir = Path(td) + results = run_matrix( + devourer_root=args.devourer_root, + tx_dut=tx_dut, + rx_dut=rx_dut, + channel=args.channel, + duration=args.duration, + threshold=args.pass_threshold, + tmpdir=tmpdir, + abort_on_baseline_fail=not args.no_baseline_abort, + ) + print() + md = emit_markdown( + tx_dut, rx_dut, args.channel, args.duration, + args.pass_threshold, results, + ) + print(md) + if args.keep_logs: + kept = Path(tempfile.gettempdir()) / "devourer-regress-last" + if kept.is_symlink() or kept.exists(): + kept.unlink() + kept.symlink_to(tmpdir) + print(f"(logs kept at {kept} — symlink, valid until next run)") + # Detach from cleanup by exiting before TemporaryDirectory wipes it. + # NOTE: this is sticky; the symlink will dangle next run. + os._exit(0) + + +if __name__ == "__main__": + main()