@@ -110,6 +110,11 @@ from .remote import Remote
self.addr = self.v6 if self.v6 else self.v4
self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4
+ self.addr_ipver = "6" if self.v6 else "4"
+ # Bracketed addresses, some commands need IPv6 to be inside []
+ self.baddr = f"[{self.v6}]" if self.v6 else self.v4
+ self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4
+
self.ifname = self.dev['ifname']
self.ifindex = self.dev['ifindex']
@@ -2,8 +2,9 @@
# SPDX-License-Identifier: GPL-2.0
from lib.py import ksft_run, ksft_exit
+from lib.py import ksft_eq
from lib.py import NetDrvEpEnv
-from lib.py import cmd
+from lib.py import bkg, cmd, wait_port_listen, rand_port
def test_v4(cfg) -> None:
@@ -16,6 +17,24 @@ from lib.py import cmd
cmd(f"ping -c 1 -W0.5 {cfg.v6}", host=cfg.remote)
+def test_tcp(cfg) -> None:
+ port = rand_port()
+ listen_cmd = f"socat -{cfg.addr_ipver} -t 2 -u TCP-LISTEN:{port},reuseport STDOUT"
+
+ with bkg(listen_cmd, exit_wait=True) as nc:
+ wait_port_listen(port)
+
+ cmd(f"echo ping | socat -t 2 -u STDIN TCP:{cfg.baddr}:{port}",
+ shell=True, host=cfg.remote)
+ ksft_eq(nc.stdout.strip(), "ping")
+
+ with bkg(listen_cmd, host=cfg.remote, exit_wait=True) as nc:
+ wait_port_listen(port, host=cfg.remote)
+
+ cmd(f"echo ping | socat -t 2 -u STDIN TCP:{cfg.remote_baddr}:{port}", shell=True)
+ ksft_eq(nc.stdout.strip(), "ping")
+
+
def main() -> None:
with NetDrvEpEnv(__file__) as cfg:
ksft_run(globs=globals(), case_pfx={"test_"}, args=(cfg, ))
@@ -1,7 +1,11 @@
# SPDX-License-Identifier: GPL-2.0
import json as _json
+import random
+import re
import subprocess
+import time
+
class cmd:
def __init__(self, comm, shell=True, fail=True, ns=None, background=False, host=None):
@@ -38,6 +42,20 @@ import subprocess
(self.proc.args, stdout, stderr))
+class bkg(cmd):
+ def __init__(self, comm, shell=True, fail=True, ns=None, host=None,
+ exit_wait=False):
+ super().__init__(comm, background=True,
+ shell=shell, fail=fail, ns=ns, host=host)
+ self.terminate = not exit_wait
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, ex_type, ex_value, ex_tb):
+ return self.process(terminate=self.terminate)
+
+
def ip(args, json=None, ns=None, host=None):
cmd_str = "ip "
if json:
@@ -47,3 +65,28 @@ import subprocess
if json:
return _json.loads(cmd_obj.stdout)
return cmd_obj
+
+
+def rand_port():
+ """
+ Get unprivileged port, for now just random, one day we may decide to check if used.
+ """
+ return random.randint(1024, 65535)
+
+
+def wait_port_listen(port, proto="tcp", ns=None, host=None, sleep=0.005, deadline=5):
+ end = time.monotonic() + deadline
+
+ pattern = f":{port:04X} .* "
+ if proto == "tcp": # for tcp protocol additionally check the socket state
+ pattern += "0A"
+ pattern = re.compile(pattern)
+
+ while True:
+ data = cmd(f'cat /proc/net/{proto}*', ns=ns, host=host, shell=True).stdout
+ for row in data.split("\n"):
+ if pattern.search(row):
+ return
+ if time.monotonic() > end:
+ raise Exception("Waiting for port listen timed out")
+ time.sleep(sleep)
More complex tests often have to spawn a background process, like a server which will respond to requests or tcpdump. Add support for creating such processes using the with keyword: with bkg("my-daemon", ..): # my-daemon is alive in this block My initial thought was to add this support to cmd() directly but it runs the command in the constructor, so by the time we __enter__ it's too late to make sure we used "background=True". Second useful helper transplanted from net_helper.sh is wait_port_listen(). The test itself uses socat, which insists on v6 addresses being wrapped in [], it's not the only command which requires this format, so add the wrapped address to env. The hope is to save test code from checking if address is v6. Signed-off-by: Jakub Kicinski <kuba@kernel.org> --- .../selftests/drivers/net/lib/py/env.py | 5 +++ tools/testing/selftests/drivers/net/ping.py | 21 ++++++++- tools/testing/selftests/net/lib/py/utils.py | 43 +++++++++++++++++++ 3 files changed, 68 insertions(+), 1 deletion(-)