@@ -110,6 +110,10 @@ from .remote import Remote
self.addr = self.v6 if self.v6 else self.v4
self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4
+ # Bracketed addresses, some commands need IPv6 to be inside []
+ self.baddr = f"[{self.v6}]" if self.v6 else self.v4
+ self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4
+
self.ifname = self.dev['ifname']
self.ifindex = self.dev['ifindex']
@@ -1,9 +1,12 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0
+import random
+
from lib.py import ksft_run, ksft_exit
+from lib.py import ksft_eq
from lib.py import NetDrvEpEnv
-from lib.py import cmd
+from lib.py import bkg, cmd, wait_port_listen
def test_v4(cfg) -> None:
@@ -16,6 +19,25 @@ from lib.py import cmd
cmd(f"ping -c 1 -W0.5 {cfg.v6}", host=cfg.remote)
+def test_tcp(cfg) -> None:
+ port = random.randrange(1024 + (1 << 15))
+
+ with bkg(f"socat -t 2 -u TCP-LISTEN:{port} STDOUT", exit_wait=True) as nc:
+ wait_port_listen(port)
+
+ cmd(f"echo ping | socat -t 2 -u STDIN TCP:{cfg.baddr}:{port}",
+ shell=True, host=cfg.remote)
+ ksft_eq(nc.stdout.strip(), "ping")
+
+ port = random.randrange(1024 + (1 << 15))
+ with bkg(f"socat -t 2 -u TCP-LISTEN:{port} STDOUT", host=cfg.remote,
+ exit_wait=True) as nc:
+ wait_port_listen(port, host=cfg.remote)
+
+ cmd(f"echo ping | socat -t 2 -u STDIN TCP:{cfg.remote_baddr}:{port}", shell=True)
+ ksft_eq(nc.stdout.strip(), "ping")
+
+
def main() -> None:
with NetDrvEpEnv(__file__) as cfg:
ksft_run(globs=globals(), case_pfx={"test_"}, args=(cfg, ))
@@ -1,7 +1,10 @@
# SPDX-License-Identifier: GPL-2.0
import json as _json
+import re
import subprocess
+import time
+
class cmd:
def __init__(self, comm, shell=True, fail=True, ns=None, background=False, host=None):
@@ -38,6 +41,20 @@ import subprocess
(self.proc.args, stdout, stderr))
+class bkg(cmd):
+ def __init__(self, comm, shell=True, fail=True, ns=None, host=None,
+ exit_wait=False):
+ super().__init__(comm, background=True,
+ shell=shell, fail=fail, ns=ns, host=host)
+ self.terminate = not exit_wait
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, ex_type, ex_value, ex_tb):
+ return self.process(terminate=self.terminate)
+
+
def ip(args, json=None, ns=None, host=None):
cmd_str = "ip "
if json:
@@ -47,3 +64,21 @@ import subprocess
if json:
return _json.loads(cmd_obj.stdout)
return cmd_obj
+
+
+def wait_port_listen(port, proto="tcp", ns=None, host=None, sleep=0.005, deadline=5):
+ end = time.monotonic() + deadline
+
+ pattern = f":{port:04X} .* "
+ if proto == "tcp": # for tcp protocol additionally check the socket state
+ pattern += "0A"
+ pattern = re.compile(pattern)
+
+ while True:
+ data = cmd(f'cat /proc/net/{proto}*', ns=ns, host=host, shell=True).stdout
+ for row in data.split("\n"):
+ if pattern.search(row):
+ return
+ if time.monotonic() > end:
+ raise Exception("Waiting for port listen timed out")
+ time.sleep(sleep)
More complex tests often have to spawn a background process, like a server which will respond to requests or tcpdump. Add support for creating such processes using the with keyword: with bkg("my-daemon", ..): # my-daemon is alive in this block My initial thought was to add this support to cmd() directly but it runs the command in the constructor, so by the time we __enter__ it's too late to make sure we used "background=True". Second useful helper transplanted from net_helper.sh is wait_port_listen(). The test itself uses socat, which insists on v6 addresses being wrapped in [], it's not the only command which requires this format, so add the wrapped address to env. The hope is to save test code from checking if address is v6. Signed-off-by: Jakub Kicinski <kuba@kernel.org> --- .../selftests/drivers/net/lib/py/env.py | 4 +++ tools/testing/selftests/drivers/net/ping.py | 24 ++++++++++++- tools/testing/selftests/net/lib/py/utils.py | 35 +++++++++++++++++++ 3 files changed, 62 insertions(+), 1 deletion(-)