diff mbox series

[v5,bpf-next,09/17] selftests: xsk: split worker thread

Message ID 20210329224316.17793-10-maciej.fijalkowski@intel.com
State New
Headers show
Series AF_XDP selftests improvements & bpf_link | expand

Commit Message

Maciej Fijalkowski March 29, 2021, 10:43 p.m. UTC
Let's a have a separate Tx/Rx worker threads instead of a one common
thread packed with Tx/Rx specific checks.

Move mmap for umem buffer space and a switch_namespace() call to
thread_common_ops.

This also allows for a bunch of simplifactions that are the subject of
the next commits. The final result will be a code base that is much
easier to follow.

Signed-off-by: Maciej Fijalkowski <maciej.fijalkowski@intel.com>
---
 tools/testing/selftests/bpf/xdpxceiver.c | 156 +++++++++++------------
 1 file changed, 77 insertions(+), 79 deletions(-)
diff mbox series

Patch

diff --git a/tools/testing/selftests/bpf/xdpxceiver.c b/tools/testing/selftests/bpf/xdpxceiver.c
index 2e5f536563e4..b6fd5eb1d620 100644
--- a/tools/testing/selftests/bpf/xdpxceiver.c
+++ b/tools/testing/selftests/bpf/xdpxceiver.c
@@ -760,6 +760,15 @@  static void thread_common_ops(struct ifobject *ifobject, void *bufs, pthread_mut
 	int ctr = 0;
 	int ret;
 
+	pthread_attr_setstacksize(&attr, THREAD_STACK);
+
+	bufs = mmap(NULL, num_frames * XSK_UMEM__DEFAULT_FRAME_SIZE,
+		    PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
+	if (bufs == MAP_FAILED)
+		exit_with_error(errno);
+
+	ifobject->ns_fd = switch_namespace(ifobject->nsname);
+
 	xsk_configure_umem(ifobject, bufs, num_frames * XSK_UMEM__DEFAULT_FRAME_SIZE);
 	ret = xsk_configure_socket(ifobject);
 
@@ -782,9 +791,12 @@  static void thread_common_ops(struct ifobject *ifobject, void *bufs, pthread_mut
 
 	if (ctr >= SOCK_RECONF_CTR)
 		exit_with_error(ret);
+
+	print_verbose("Interface [%s] vector [%s]\n",
+		      ifobject->ifname, ifobject->fv.vector == tx ? "Tx" : "Rx");
 }
 
-static void *worker_testapp_validate(void *arg)
+static void *worker_testapp_validate_tx(void *arg)
 {
 	struct udphdr *udp_hdr =
 	    (struct udphdr *)(pkt_data + sizeof(struct ethhdr) + sizeof(struct iphdr));
@@ -792,97 +804,83 @@  static void *worker_testapp_validate(void *arg)
 	struct ethhdr *eth_hdr = (struct ethhdr *)pkt_data;
 	struct ifobject *ifobject = (struct ifobject *)arg;
 	struct generic_data data;
+	int spinningrxctr = 0;
 	void *bufs = NULL;
 
-	pthread_attr_setstacksize(&attr, THREAD_STACK);
-
-	if (!bidi_pass) {
-		bufs = mmap(NULL, num_frames * XSK_UMEM__DEFAULT_FRAME_SIZE,
-			    PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
-		if (bufs == MAP_FAILED)
-			exit_with_error(errno);
+	if (!bidi_pass)
+		thread_common_ops(ifobject, bufs, &sync_mutex_tx, &spinning_tx);
 
-		ifobject->ns_fd = switch_namespace(ifobject->nsname);
+	while (atomic_load(&spinning_rx) && spinningrxctr < SOCK_RECONF_CTR) {
+		spinningrxctr++;
+		usleep(USLEEP_MAX);
 	}
 
-	if (ifobject->fv.vector == tx) {
-		int spinningrxctr = 0;
+	for (int i = 0; i < num_frames; i++) {
+		/*send EOT frame */
+		if (i == (num_frames - 1))
+			data.seqnum = -1;
+		else
+			data.seqnum = i;
+		gen_udp_hdr(&data, ifobject, udp_hdr);
+		gen_ip_hdr(ifobject, ip_hdr);
+		gen_udp_csum(udp_hdr, ip_hdr);
+		gen_eth_hdr(ifobject, eth_hdr);
+		gen_eth_frame(ifobject->umem, i * XSK_UMEM__DEFAULT_FRAME_SIZE);
+	}
 
-		if (!bidi_pass)
-			thread_common_ops(ifobject, bufs, &sync_mutex_tx, &spinning_tx);
+	print_verbose("Sending %d packets on interface %s\n",
+		      (opt_pkt_count - 1), ifobject->ifname);
+	tx_only_all(ifobject);
 
-		while (atomic_load(&spinning_rx) && spinningrxctr < SOCK_RECONF_CTR) {
-			spinningrxctr++;
-			usleep(USLEEP_MAX);
-		}
+	if (test_type != TEST_TYPE_BIDI || bidi_pass) {
+		xsk_socket__delete(ifobject->xsk->xsk);
+		(void)xsk_umem__delete(ifobject->umem->umem);
+	}
+	pthread_exit(NULL);
+}
 
-		print_verbose("Interface [%s] vector [Tx]\n", ifobject->ifname);
-		for (int i = 0; i < num_frames; i++) {
-			/*send EOT frame */
-			if (i == (num_frames - 1))
-				data.seqnum = -1;
-			else
-				data.seqnum = i;
-			gen_udp_hdr(&data, ifobject, udp_hdr);
-			gen_ip_hdr(ifobject, ip_hdr);
-			gen_udp_csum(udp_hdr, ip_hdr);
-			gen_eth_hdr(ifobject, eth_hdr);
-			gen_eth_frame(ifobject->umem, i * XSK_UMEM__DEFAULT_FRAME_SIZE);
-		}
+static void *worker_testapp_validate_rx(void *arg)
+{
+	struct ifobject *ifobject = (struct ifobject *)arg;
+	struct pollfd fds[MAX_SOCKS] = { };
+	void *bufs = NULL;
 
-		print_verbose("Sending %d packets on interface %s\n",
-			       (opt_pkt_count - 1), ifobject->ifname);
-		tx_only_all(ifobject);
-	} else if (ifobject->fv.vector == rx) {
-		struct pollfd fds[MAX_SOCKS] = { };
-		int ret;
-
-		if (!bidi_pass)
-			thread_common_ops(ifobject, bufs, &sync_mutex_tx, &spinning_rx);
-
-		print_verbose("Interface [%s] vector [Rx]\n", ifobject->ifname);
-		if (stat_test_type != STAT_TEST_RX_FILL_EMPTY)
-			xsk_populate_fill_ring(ifobject->umem);
-
-		TAILQ_INIT(&head);
-		if (debug_pkt_dump) {
-			pkt_buf = calloc(num_frames, sizeof(*pkt_buf));
-			if (!pkt_buf)
-				exit_with_error(errno);
-		}
+	if (!bidi_pass)
+		thread_common_ops(ifobject, bufs, &sync_mutex_tx, &spinning_rx);
 
-		fds[0].fd = xsk_socket__fd(ifobject->xsk->xsk);
-		fds[0].events = POLLIN;
+	if (stat_test_type != STAT_TEST_RX_FILL_EMPTY)
+		xsk_populate_fill_ring(ifobject->umem);
 
-		pthread_mutex_lock(&sync_mutex);
-		pthread_cond_signal(&signal_rx_condition);
-		pthread_mutex_unlock(&sync_mutex);
+	TAILQ_INIT(&head);
+	if (debug_pkt_dump) {
+		pkt_buf = calloc(num_frames, sizeof(*pkt_buf));
+		if (!pkt_buf)
+			exit_with_error(errno);
+	}
 
-		while (1) {
-			if (test_type == TEST_TYPE_POLL) {
-				ret = poll(fds, 1, POLL_TMOUT);
-				if (ret <= 0)
-					continue;
-			}
+	fds[0].fd = xsk_socket__fd(ifobject->xsk->xsk);
+	fds[0].events = POLLIN;
 
-			if (test_type != TEST_TYPE_STATS) {
-				rx_pkt(ifobject->xsk, fds);
-				worker_pkt_validate();
-			} else {
-				worker_stats_validate(ifobject);
-			}
+	pthread_mutex_lock(&sync_mutex);
+	pthread_cond_signal(&signal_rx_condition);
+	pthread_mutex_unlock(&sync_mutex);
 
-			if (sigvar)
-				break;
+	while (1) {
+		if (test_type != TEST_TYPE_STATS) {
+			rx_pkt(ifobject->xsk, fds);
+			worker_pkt_validate();
+		} else {
+			worker_stats_validate(ifobject);
 		}
+		if (sigvar)
+			break;
+	}
 
-		if (test_type != TEST_TYPE_STATS)
-			print_verbose("Received %d packets on interface %s\n",
-				pkt_counter, ifobject->ifname);
+	print_verbose("Received %d packets on interface %s\n",
+		      pkt_counter, ifobject->ifname);
 
-		if (test_type == TEST_TYPE_TEARDOWN)
-			print_verbose("Destroying socket\n");
-	}
+	if (test_type == TEST_TYPE_TEARDOWN)
+		print_verbose("Destroying socket\n");
 
 	if ((test_type != TEST_TYPE_BIDI) || bidi_pass) {
 		xsk_socket__delete(ifobject->xsk->xsk);
@@ -911,12 +909,12 @@  static void testapp_validate(void)
 
 	/*Spawn RX thread */
 	if (!bidi || !bidi_pass) {
-		if (pthread_create(&t0, &attr, worker_testapp_validate, ifdict[1]))
+		if (pthread_create(&t0, &attr, worker_testapp_validate_rx, ifdict[1]))
 			exit_with_error(errno);
 	} else if (bidi && bidi_pass) {
 		/*switch Tx/Rx vectors */
 		ifdict[0]->fv.vector = rx;
-		if (pthread_create(&t0, &attr, worker_testapp_validate, ifdict[0]))
+		if (pthread_create(&t0, &attr, worker_testapp_validate_rx, ifdict[0]))
 			exit_with_error(errno);
 	}
 
@@ -931,12 +929,12 @@  static void testapp_validate(void)
 
 	/*Spawn TX thread */
 	if (!bidi || !bidi_pass) {
-		if (pthread_create(&t1, &attr, worker_testapp_validate, ifdict[0]))
+		if (pthread_create(&t1, &attr, worker_testapp_validate_tx, ifdict[0]))
 			exit_with_error(errno);
 	} else if (bidi && bidi_pass) {
 		/*switch Tx/Rx vectors */
 		ifdict[1]->fv.vector = tx;
-		if (pthread_create(&t1, &attr, worker_testapp_validate, ifdict[1]))
+		if (pthread_create(&t1, &attr, worker_testapp_validate_tx, ifdict[1]))
 			exit_with_error(errno);
 	}