diff mbox series

[v2,1/1] test: ipc pktio: bind tasks and keep packet order

Message ID 1500375605-10483-2-git-send-email-odpbot@yandex.ru
State New
Headers show
Series [v2,1/1] test: ipc pktio: bind tasks and keep packet order | expand

Commit Message

Github ODP bot July 18, 2017, 11 a.m. UTC
From: Maxim Uvarov <maxim.uvarov@linaro.org>


Tasks which use odp time api have to be bind to cores.
pktio_ipc1 and pktio_ipc2 apps expect counters inside packets
incrementing in order - cache request in buffers to process
such packet can not be allocated immediately. Single reorder
of MAX_PKT_BURST+2 is ok for current example code to find initial
counters value.
https://bugs.linaro.org/show_bug.cgi?id=3126

Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>

---
/** Email created from pull request 79 (muvarov:master_ipc_bug3126)
 ** https://github.com/Linaro/odp/pull/79
 ** Patch: https://github.com/Linaro/odp/pull/79.patch
 ** Base sha: 490f4bf22129638899ce71c99a8847e8ba849692
 ** Merge commit sha: 816269431ff450d009f6e09611952977ce11b776
 **/
 .../linux-generic/include/odp_packet_io_internal.h |  3 +-
 platform/linux-generic/pktio/ipc.c                 | 35 +++++++++++++++-----
 test/linux-generic/pktio_ipc/ipc_common.h          |  4 +++
 test/linux-generic/pktio_ipc/pktio_ipc1.c          | 38 +++++++++++++++-------
 test/linux-generic/pktio_ipc/pktio_ipc2.c          | 24 +++++++++++++-
 5 files changed, 82 insertions(+), 22 deletions(-)
diff mbox series

Patch

diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h
index 93040681..20a86557 100644
--- a/platform/linux-generic/include/odp_packet_io_internal.h
+++ b/platform/linux-generic/include/odp_packet_io_internal.h
@@ -85,9 +85,10 @@  typedef	struct {
 		_ring_t	*recv; /**< ODP ring for IPC msg packets
 					    indexes received from shared
 					     memory (from remote process) */
-		_ring_t	*free; /**< ODP ring for IPC msg packets
+		_ring_t	*free; /**< odp ring for ipc msg packets
 					    indexes already processed by
 					    current process */
+		_ring_t	*cache; /**< local cache to keep packet order right */
 	} rx; /* slave */
 	void		*pool_base;		/**< Remote pool base addr */
 	void		*pool_mdata_base;	/**< Remote pool mdata base addr */
diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c
index acdfc4ff..bc7d7564 100644
--- a/platform/linux-generic/pktio/ipc.c
+++ b/platform/linux-generic/pktio/ipc.c
@@ -341,6 +341,10 @@  static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,
 
 	odp_atomic_init_u32(&pktio_entry->s.ipc.ready, 0);
 
+	pktio_entry->s.ipc.rx.cache = _ring_create("ipc_rx_cache",
+						   PKTIO_IPC_ENTRIES,
+						   _RING_NO_LIST);
+
 	/* Shared info about remote pktio */
 	if (sscanf(dev, "ipc:%d:%s", &pid, tail) == 2) {
 		pktio_entry->s.ipc.type = PKTIO_TYPE_IPC_SLAVE;
@@ -437,14 +441,19 @@  static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,
 
 	_ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);
 
-	r = pktio_entry->s.ipc.rx.recv;
+	/* rx from cache */
+	r = pktio_entry->s.ipc.rx.cache;
 	pkts = _ring_mc_dequeue_burst(r, ipcbufs_p, len);
 	if (odp_unlikely(pkts < 0))
 		ODP_ABORT("internal error dequeue\n");
 
-	for (i = 0; i < pkts; i++) {
-		IPC_ODP_DBG("%d/%d recv packet offset %x\n",
-			    i, pkts, offsets[i]);
+	/* rx from other app */
+	if (pkts == 0) {
+		ipcbufs_p = (void *)&offsets[0];
+		r = pktio_entry->s.ipc.rx.recv;
+		pkts = _ring_mc_dequeue_burst(r, ipcbufs_p, len);
+		if (odp_unlikely(pkts < 0))
+			ODP_ABORT("internal error dequeue\n");
 	}
 
 	/* fast path */
@@ -473,10 +482,12 @@  static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,
 			/* Original pool might be smaller then
 			*  PKTIO_IPC_ENTRIES. If packet can not be
 			 * allocated from pool at this time,
-			 * simple get in on next recv() call.
+			 * simple get in on next recv() call. To keep
+			 * packet ordering store such packets in local
+			 * cache.
 			 */
-			if (i == 0)
-				return 0;
+			IPC_ODP_DBG("unable to allocate packet %d/%d\n",
+				    i, pkts);
 			break;
 		}
 
@@ -507,11 +518,16 @@  static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,
 
 	/* put back to rx ring dequed but not processed packets*/
 	if (pkts != i) {
-		r_p = pktio_entry->s.ipc.rx.recv;
 		ipcbufs_p = (void *)&offsets[i];
+		r_p = pktio_entry->s.ipc.rx.cache;
 		pkts_ring = _ring_mp_enqueue_burst(r_p, ipcbufs_p, pkts - i);
+
 		if (pkts_ring != (pkts - i))
-			ODP_ERR("bug to enqueue packets\n");
+			ODP_ABORT("bug to enqueue packets\n");
+
+		if (i == 0)
+			return 0;
+
 	}
 
 	/*num of actually received packets*/
@@ -740,6 +756,7 @@  static int ipc_close(pktio_entry_t *pktio_entry)
 	_ring_destroy(ipc_shm_name);
 	snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", name);
 	_ring_destroy(ipc_shm_name);
+	_ring_destroy("ipc_rx_cache");
 
 	return 0;
 }
diff --git a/test/linux-generic/pktio_ipc/ipc_common.h b/test/linux-generic/pktio_ipc/ipc_common.h
index 62455776..ca78b684 100644
--- a/test/linux-generic/pktio_ipc/ipc_common.h
+++ b/test/linux-generic/pktio_ipc/ipc_common.h
@@ -5,6 +5,10 @@ 
  */
 
 #define _POSIX_C_SOURCE 200809L
+
+#define _GNU_SOURCE
+#include <sched.h>
+
 #include <stdlib.h>
 #include <inttypes.h>
 #include <string.h>
diff --git a/test/linux-generic/pktio_ipc/pktio_ipc1.c b/test/linux-generic/pktio_ipc/pktio_ipc1.c
index 705c205d..5d8434bf 100644
--- a/test/linux-generic/pktio_ipc/pktio_ipc1.c
+++ b/test/linux-generic/pktio_ipc/pktio_ipc1.c
@@ -42,6 +42,7 @@  static int pktio_run_loop(odp_pool_t pool)
 	int ret;
 	odp_pktin_queue_t pktin;
 	char name[30];
+	int sync_cnt = 0;
 
 	if (master_pid)
 		sprintf(name, TEST_IPC_PKTIO_PID_NAME, master_pid);
@@ -178,7 +179,7 @@  static int pktio_run_loop(odp_pool_t pool)
 
 				cnt_recv++;
 
-				if (head.seq != cnt_recv) {
+				if (head.seq != cnt_recv && sync_cnt) {
 					stat_errors++;
 					odp_packet_free(pkt);
 					EXAMPLE_DBG("head.seq %d - "
@@ -187,7 +188,6 @@  static int pktio_run_loop(odp_pool_t pool)
 						    head.seq, cnt_recv,
 						    head.seq - cnt_recv);
 					cnt_recv = head.seq;
-					stat_errors++;
 					stat_free++;
 					continue;
 				}
@@ -213,13 +213,6 @@  static int pktio_run_loop(odp_pool_t pool)
 			pkt_tbl[i] = pkt;
 		}
 
-		/* exit if no packets allocated */
-		if (i == 0) {
-			EXAMPLE_DBG("unable to alloc packet pkts %d/%d\n",
-				    i, pkts);
-			break;
-		}
-
 		pkts = i;
 
 		/* 4. Copy counter and magic numbers to that packets */
@@ -262,6 +255,10 @@  static int pktio_run_loop(odp_pool_t pool)
 		if (odp_time_cmp(odp_time_local_from_ns(ODP_TIME_SEC_IN_NS),
 				 diff) < 0) {
 			current_cycle = cycle;
+			if (!sync_cnt && stat_errors == (MAX_PKT_BURST + 2)) {
+				stat_errors = 0;
+				sync_cnt = 1;
+			}
 			printf("\rpkts:  %" PRIu64 ", alloc  %" PRIu64 ","
 			       " errors %" PRIu64 ", pps  %" PRIu64 ","
 			       " free %" PRIu64 ".",
@@ -287,7 +284,7 @@  static int pktio_run_loop(odp_pool_t pool)
 		return -1;
 	}
 
-	return (stat_errors > 10 || stat_pkts < 1000) ? -1 : 0;
+	return (stat_errors || stat_pkts < 1000) ? -1 : 0;
 }
 
 /**
@@ -299,6 +296,10 @@  int main(int argc, char *argv[])
 	odp_pool_param_t params;
 	odp_instance_t instance;
 	int ret;
+	cpu_set_t cpu_set;
+	odp_cpumask_t mask;
+	int cpu;
+	pid_t pid;
 
 	/* Parse and store the application arguments */
 	parse_args(argc, argv);
@@ -309,8 +310,23 @@  int main(int argc, char *argv[])
 		exit(EXIT_FAILURE);
 	}
 
+	odp_cpumask_default_worker(&mask, 0);
+	cpu = odp_cpumask_first(&mask);
+
+	CPU_ZERO(&cpu_set);
+	CPU_SET(cpu, &cpu_set);
+
+	pid = getpid();
+
+	if (sched_setaffinity(pid, sizeof(cpu_set_t), &cpu_set)) {
+		printf("Set CPU affinity failed.\n");
+		return -1;
+	}
+
+	printf("ipc_pktio1 %d run on cpu %d\n", pid, cpu);
+
 	/* Init this thread */
-	if (odp_init_local(instance, ODP_THREAD_CONTROL)) {
+	if (odp_init_local(instance, ODP_THREAD_WORKER)) {
 		EXAMPLE_ERR("Error: ODP local init failed.\n");
 		exit(EXIT_FAILURE);
 	}
diff --git a/test/linux-generic/pktio_ipc/pktio_ipc2.c b/test/linux-generic/pktio_ipc/pktio_ipc2.c
index daf38413..95d69e80 100644
--- a/test/linux-generic/pktio_ipc/pktio_ipc2.c
+++ b/test/linux-generic/pktio_ipc/pktio_ipc2.c
@@ -208,6 +208,10 @@  int main(int argc, char *argv[])
 {
 	odp_instance_t instance;
 	int ret;
+	cpu_set_t cpu_set;
+	odp_cpumask_t mask;
+	int cpu;
+	pid_t pid;
 
 	/* Parse and store the application arguments */
 	parse_args(argc, argv);
@@ -217,8 +221,26 @@  int main(int argc, char *argv[])
 		exit(EXIT_FAILURE);
 	}
 
+	odp_cpumask_default_worker(&mask, 0);
+	cpu = odp_cpumask_first(&mask);
+	ret = odp_cpumask_next(&mask, cpu);
+	if (ret != -1)
+		cpu = ret;
+
+	CPU_ZERO(&cpu_set);
+	CPU_SET(cpu, &cpu_set);
+
+	pid = getpid();
+
+	if (sched_setaffinity(pid, sizeof(cpu_set_t), &cpu_set)) {
+		printf("Set CPU affinity failed to cpu %d.\n", cpu);
+		return -1;
+	}
+
+	printf("ipc_pktio2 %d run on cpu %d\n", pid, cpu);
+
 	/* Init this thread */
-	if (odp_init_local(instance, ODP_THREAD_CONTROL)) {
+	if (odp_init_local(instance, ODP_THREAD_WORKER)) {
 		EXAMPLE_ERR("Error: ODP local init failed.\n");
 		exit(EXIT_FAILURE);
 	}