@@ -3,6 +3,7 @@
TEST_PROGS = \
csum.py \
devlink_port_split.py \
+ devmem.py \
ethtool.sh \
ethtool_extended_state.sh \
ethtool_mm.sh \
new file mode 100755
@@ -0,0 +1,45 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+
+from lib.py import ksft_run, ksft_exit
+from lib.py import ksft_eq, KsftSkipEx
+from lib.py import NetDrvEpEnv
+from lib.py import bkg, cmd, rand_port, wait_port_listen
+from lib.py import ksft_disruptive
+
+
+def require_devmem(cfg):
+ if not hasattr(cfg, "_devmem_probed"):
+ port = rand_port()
+ probe_command = f"./ncdevmem -f {cfg.ifname}"
+ cfg._devmem_supported = cmd(probe_command, fail=False, shell=True).ret == 0
+ cfg._devmem_probed = True
+
+ if not cfg._devmem_supported:
+ raise KsftSkipEx("Test requires devmem support")
+
+
+@ksft_disruptive
+def check_rx(cfg) -> None:
+ cfg.require_v6()
+ require_devmem(cfg)
+
+ port = rand_port()
+ listen_cmd = f"./ncdevmem -l -f {cfg.ifname} -s {cfg.v6} -p {port}"
+
+ with bkg(listen_cmd) as socat:
+ wait_port_listen(port)
+ cmd(f"echo -e \"hello\\nworld\"| socat -u - TCP6:[{cfg.v6}]:{port}", host=cfg.remote, shell=True)
+
+ ksft_eq(socat.stdout.strip(), "hello\nworld")
+
+
+def main() -> None:
+ with NetDrvEpEnv(__file__) as cfg:
+ ksft_run([check_rx],
+ args=(cfg, ))
+ ksft_exit()
+
+
+if __name__ == "__main__":
+ main()