@@ -85,9 +85,10 @@ typedef struct {
_ring_t *recv; /**< ODP ring for IPC msg packets
indexes received from shared
memory (from remote process) */
- _ring_t *free; /**< ODP ring for IPC msg packets
+ _ring_t *free; /**< odp ring for ipc msg packets
indexes already processed by
current process */
+ _ring_t *cache; /**< local cache to keep packet order right */
} rx; /* slave */
void *pool_base; /**< Remote pool base addr */
void *pool_mdata_base; /**< Remote pool mdata base addr */
@@ -341,6 +341,10 @@ static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,
odp_atomic_init_u32(&pktio_entry->s.ipc.ready, 0);
+ pktio_entry->s.ipc.rx.cache = _ring_create("ipc_rx_cache",
+ PKTIO_IPC_ENTRIES,
+ _RING_NO_LIST);
+
/* Shared info about remote pktio */
if (sscanf(dev, "ipc:%d:%s", &pid, tail) == 2) {
pktio_entry->s.ipc.type = PKTIO_TYPE_IPC_SLAVE;
@@ -437,14 +441,19 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,
_ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);
- r = pktio_entry->s.ipc.rx.recv;
+ /* rx from cache */
+ r = pktio_entry->s.ipc.rx.cache;
pkts = _ring_mc_dequeue_burst(r, ipcbufs_p, len);
if (odp_unlikely(pkts < 0))
ODP_ABORT("internal error dequeue\n");
- for (i = 0; i < pkts; i++) {
- IPC_ODP_DBG("%d/%d recv packet offset %x\n",
- i, pkts, offsets[i]);
+ /* rx from other app */
+ if (pkts == 0) {
+ ipcbufs_p = (void *)&offsets[0];
+ r = pktio_entry->s.ipc.rx.recv;
+ pkts = _ring_mc_dequeue_burst(r, ipcbufs_p, len);
+ if (odp_unlikely(pkts < 0))
+ ODP_ABORT("internal error dequeue\n");
}
/* fast path */
@@ -473,10 +482,12 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,
/* Original pool might be smaller then
* PKTIO_IPC_ENTRIES. If packet can not be
* allocated from pool at this time,
- * simple get in on next recv() call.
+ * simple get in on next recv() call. To keep
+ * packet ordering store such packets in local
+ * cache.
*/
- if (i == 0)
- return 0;
+ IPC_ODP_DBG("unable to allocate packet %d/%d\n",
+ i, pkts);
break;
}
@@ -507,11 +518,16 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry,
/* put back to rx ring dequed but not processed packets*/
if (pkts != i) {
- r_p = pktio_entry->s.ipc.rx.recv;
ipcbufs_p = (void *)&offsets[i];
+ r_p = pktio_entry->s.ipc.rx.cache;
pkts_ring = _ring_mp_enqueue_burst(r_p, ipcbufs_p, pkts - i);
+
if (pkts_ring != (pkts - i))
- ODP_ERR("bug to enqueue packets\n");
+ ODP_ABORT("bug to enqueue packets\n");
+
+ if (i == 0)
+ return 0;
+
}
/*num of actually received packets*/
@@ -740,6 +756,7 @@ static int ipc_close(pktio_entry_t *pktio_entry)
_ring_destroy(ipc_shm_name);
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", name);
_ring_destroy(ipc_shm_name);
+ _ring_destroy("ipc_rx_cache");
return 0;
}
@@ -5,6 +5,10 @@
*/
#define _POSIX_C_SOURCE 200809L
+
+#define _GNU_SOURCE
+#include <sched.h>
+
#include <stdlib.h>
#include <inttypes.h>
#include <string.h>
@@ -42,6 +42,7 @@ static int pktio_run_loop(odp_pool_t pool)
int ret;
odp_pktin_queue_t pktin;
char name[30];
+ int sync_cnt = 0;
if (master_pid)
sprintf(name, TEST_IPC_PKTIO_PID_NAME, master_pid);
@@ -178,7 +179,7 @@ static int pktio_run_loop(odp_pool_t pool)
cnt_recv++;
- if (head.seq != cnt_recv) {
+ if (head.seq != cnt_recv && sync_cnt) {
stat_errors++;
odp_packet_free(pkt);
EXAMPLE_DBG("head.seq %d - "
@@ -187,7 +188,6 @@ static int pktio_run_loop(odp_pool_t pool)
head.seq, cnt_recv,
head.seq - cnt_recv);
cnt_recv = head.seq;
- stat_errors++;
stat_free++;
continue;
}
@@ -213,13 +213,6 @@ static int pktio_run_loop(odp_pool_t pool)
pkt_tbl[i] = pkt;
}
- /* exit if no packets allocated */
- if (i == 0) {
- EXAMPLE_DBG("unable to alloc packet pkts %d/%d\n",
- i, pkts);
- break;
- }
-
pkts = i;
/* 4. Copy counter and magic numbers to that packets */
@@ -262,6 +255,10 @@ static int pktio_run_loop(odp_pool_t pool)
if (odp_time_cmp(odp_time_local_from_ns(ODP_TIME_SEC_IN_NS),
diff) < 0) {
current_cycle = cycle;
+ if (!sync_cnt && stat_errors == (MAX_PKT_BURST + 2)) {
+ stat_errors = 0;
+ sync_cnt = 1;
+ }
printf("\rpkts: %" PRIu64 ", alloc %" PRIu64 ","
" errors %" PRIu64 ", pps %" PRIu64 ","
" free %" PRIu64 ".",
@@ -287,7 +284,7 @@ static int pktio_run_loop(odp_pool_t pool)
return -1;
}
- return (stat_errors > 10 || stat_pkts < 1000) ? -1 : 0;
+ return (stat_errors || stat_pkts < 1000) ? -1 : 0;
}
/**
@@ -299,6 +296,10 @@ int main(int argc, char *argv[])
odp_pool_param_t params;
odp_instance_t instance;
int ret;
+ cpu_set_t cpu_set;
+ odp_cpumask_t mask;
+ int cpu;
+ pid_t pid;
/* Parse and store the application arguments */
parse_args(argc, argv);
@@ -309,8 +310,23 @@ int main(int argc, char *argv[])
exit(EXIT_FAILURE);
}
+ odp_cpumask_default_worker(&mask, 0);
+ cpu = odp_cpumask_first(&mask);
+
+ CPU_ZERO(&cpu_set);
+ CPU_SET(cpu, &cpu_set);
+
+ pid = getpid();
+
+ if (sched_setaffinity(pid, sizeof(cpu_set_t), &cpu_set)) {
+ printf("Set CPU affinity failed.\n");
+ return -1;
+ }
+
+ printf("ipc_pktio1 %d run on cpu %d\n", pid, cpu);
+
/* Init this thread */
- if (odp_init_local(instance, ODP_THREAD_CONTROL)) {
+ if (odp_init_local(instance, ODP_THREAD_WORKER)) {
EXAMPLE_ERR("Error: ODP local init failed.\n");
exit(EXIT_FAILURE);
}
@@ -208,6 +208,10 @@ int main(int argc, char *argv[])
{
odp_instance_t instance;
int ret;
+ cpu_set_t cpu_set;
+ odp_cpumask_t mask;
+ int cpu;
+ pid_t pid;
/* Parse and store the application arguments */
parse_args(argc, argv);
@@ -217,8 +221,26 @@ int main(int argc, char *argv[])
exit(EXIT_FAILURE);
}
+ odp_cpumask_default_worker(&mask, 0);
+ cpu = odp_cpumask_first(&mask);
+ ret = odp_cpumask_next(&mask, cpu);
+ if (ret != -1)
+ cpu = ret;
+
+ CPU_ZERO(&cpu_set);
+ CPU_SET(cpu, &cpu_set);
+
+ pid = getpid();
+
+ if (sched_setaffinity(pid, sizeof(cpu_set_t), &cpu_set)) {
+ printf("Set CPU affinity failed to cpu %d.\n", cpu);
+ return -1;
+ }
+
+ printf("ipc_pktio2 %d run on cpu %d\n", pid, cpu);
+
/* Init this thread */
- if (odp_init_local(instance, ODP_THREAD_CONTROL)) {
+ if (odp_init_local(instance, ODP_THREAD_WORKER)) {
EXAMPLE_ERR("Error: ODP local init failed.\n");
exit(EXIT_FAILURE);
}